Skip to content

Commit 5cd3266

Browse files
committed
feat: adding ExampleNewThrottleBy and ExampleNewThrottleByWithCount
1 parent fdd8865 commit 5cd3266

File tree

4 files changed

+234
-18
lines changed

4 files changed

+234
-18
lines changed

README.md

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
12
# lo - Iterate over slices, maps, channels...
23

34
[![tag](https://img.shields.io/github/tag/samber/lo.svg)](https://github.com/samber/lo/releases)
@@ -3419,11 +3420,12 @@ cancel("second key")
34193420
[[play](https://go.dev/play/p/d3Vpt6pxhY8)]
34203421

34213422
### Throttle
3422-
`NewThrottle` creates a throttled instance that invokes given functions only once in every interval.
3423+
3424+
Creates a throttled instance that invokes given functions only once in every interval.
3425+
34233426
This returns 2 functions, First one is throttled function and Second one is a function to reset interval.
34243427

34253428
```go
3426-
34273429
f := func() {
34283430
println("Called once in every 100ms")
34293431
}
@@ -3437,17 +3439,16 @@ for j := 0; j < 10; j++ {
34373439

34383440
reset()
34393441
throttle()
3440-
34413442
```
34423443

34433444
`NewThrottleWithCount` is NewThrottle with count limit, throttled function will be invoked count times in every interval.
3444-
```go
34453445

3446+
```go
34463447
f := func() {
34473448
println("Called three times in every 100ms")
34483449
}
34493450

3450-
throttle, reset := lo.NewThrottle(100 * time.Millisecond, f)
3451+
throttle, reset := lo.NewThrottleWithCount(100 * time.Millisecond, f)
34513452

34523453
for j := 0; j < 10; j++ {
34533454
throttle()
@@ -3456,7 +3457,24 @@ for j := 0; j < 10; j++ {
34563457

34573458
reset()
34583459
throttle()
3460+
```
3461+
3462+
`NewThrottleBy` and `NewThrottleByWithCount` are NewThrottle with sharding key, throttled function will be invoked count times in every interval.
34593463

3464+
```go
3465+
f := func(key string) {
3466+
println(key, "Called three times in every 100ms")
3467+
}
3468+
3469+
throttle, reset := lo.NewThrottleByWithCount(100 * time.Millisecond, f)
3470+
3471+
for j := 0; j < 10; j++ {
3472+
throttle("foo")
3473+
time.Sleep(30 * time.Millisecond)
3474+
}
3475+
3476+
reset()
3477+
throttle()
34603478
```
34613479

34623480
### Synchronize

retry.go

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -287,23 +287,29 @@ func (t *Transaction[T]) Process(state T) (T, error) {
287287
return state, err
288288
}
289289

290-
type throttle struct {
290+
// @TODO: single mutex per key ?
291+
type throttleBy[T comparable] struct {
291292
mu *sync.Mutex
292293
timer *time.Timer
293294
interval time.Duration
294-
callbacks []func()
295+
callbacks []func(key T)
295296
countLimit int
296-
count int
297+
count map[T]int
297298
}
298299

299-
func (th *throttle) throttledFunc() {
300+
func (th *throttleBy[T]) throttledFunc(key T) {
300301
th.mu.Lock()
301302
defer th.mu.Unlock()
302-
if th.count < th.countLimit {
303-
th.count++
303+
304+
if _, ok := th.count[key]; !ok {
305+
th.count[key] = 0
306+
}
307+
308+
if th.count[key] < th.countLimit {
309+
th.count[key]++
304310

305311
for _, f := range th.callbacks {
306-
f()
312+
f(key)
307313
}
308314

309315
}
@@ -314,35 +320,56 @@ func (th *throttle) throttledFunc() {
314320
}
315321
}
316322

317-
func (th *throttle) reset() {
323+
func (th *throttleBy[T]) reset() {
318324
th.mu.Lock()
319325
defer th.mu.Unlock()
320326

321327
if th.timer != nil {
322328
th.timer.Stop()
323329
}
324330

325-
th.count = 0
331+
th.count = map[T]int{}
326332
th.timer = nil
327-
328333
}
329334

330335
// NewThrottle creates a throttled instance that invokes given functions only once in every interval.
331336
// This returns 2 functions, First one is throttled function and Second one is a function to reset interval
332-
func NewThrottle(interval time.Duration, f ...func()) (func(), func()) {
337+
func NewThrottle(interval time.Duration, f ...func()) (throttle func(), reset func()) {
333338
return NewThrottleWithCount(interval, 1, f...)
334339
}
335340

336341
// NewThrottleWithCount is NewThrottle with count limit, throttled function will be invoked count times in every interval.
337-
func NewThrottleWithCount(interval time.Duration, count int, f ...func()) (func(), func()) {
342+
func NewThrottleWithCount(interval time.Duration, count int, f ...func()) (throttle func(), reset func()) {
343+
callbacks := Map(f, func(item func(), _ int) func(struct{}) {
344+
return func(struct{}) {
345+
item()
346+
}
347+
})
348+
349+
throttleFn, reset := NewThrottleByWithCount[struct{}](interval, count, callbacks...)
350+
return func() {
351+
throttleFn(struct{}{})
352+
}, reset
353+
}
354+
355+
// NewThrottleBy creates a throttled instance that invokes given functions only once in every interval.
356+
// This returns 2 functions, First one is throttled function and Second one is a function to reset interval
357+
func NewThrottleBy[T comparable](interval time.Duration, f ...func(key T)) (throttle func(key T), reset func()) {
358+
return NewThrottleByWithCount[T](interval, 1, f...)
359+
}
360+
361+
// NewThrottleByWithCount is NewThrottleBy with count limit, throttled function will be invoked count times in every interval.
362+
func NewThrottleByWithCount[T comparable](interval time.Duration, count int, f ...func(key T)) (throttle func(key T), reset func()) {
338363
if count <= 0 {
339364
count = 1
340365
}
341-
th := &throttle{
366+
367+
th := &throttleBy[T]{
342368
mu: new(sync.Mutex),
343369
interval: interval,
344370
callbacks: f,
345371
countLimit: count,
372+
count: map[T]int{},
346373
}
347374
return th.throttledFunc, th.reset
348375
}

retry_example_test.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,3 +249,92 @@ func ExampleTransaction_error() {
249249
// -5
250250
// error
251251
}
252+
253+
func ExampleNewThrottle() {
254+
throttle, reset := NewThrottle(100*time.Millisecond, func() {
255+
fmt.Println("Called once in every 100ms")
256+
})
257+
258+
for j := 0; j < 10; j++ {
259+
throttle()
260+
time.Sleep(30 * time.Millisecond)
261+
}
262+
263+
reset()
264+
265+
// Output:
266+
// Called once in every 100ms
267+
// Called once in every 100ms
268+
// Called once in every 100ms
269+
}
270+
271+
func ExampleNewThrottleWithCount() {
272+
throttle, reset := NewThrottleWithCount(100*time.Millisecond, 2, func() {
273+
fmt.Println("Called once in every 100ms")
274+
})
275+
276+
for j := 0; j < 10; j++ {
277+
throttle()
278+
time.Sleep(30 * time.Millisecond)
279+
}
280+
281+
reset()
282+
283+
// Output:
284+
// Called once in every 100ms
285+
// Called once in every 100ms
286+
// Called once in every 100ms
287+
// Called once in every 100ms
288+
// Called once in every 100ms
289+
// Called once in every 100ms
290+
}
291+
292+
func ExampleNewThrottleBy() {
293+
throttle, reset := NewThrottleBy(100*time.Millisecond, func(key string) {
294+
fmt.Println(key, "Called once in every 100ms")
295+
})
296+
297+
for j := 0; j < 10; j++ {
298+
throttle("foo")
299+
throttle("bar")
300+
time.Sleep(30 * time.Millisecond)
301+
}
302+
303+
reset()
304+
305+
// Output:
306+
// foo Called once in every 100ms
307+
// bar Called once in every 100ms
308+
// foo Called once in every 100ms
309+
// bar Called once in every 100ms
310+
// foo Called once in every 100ms
311+
// bar Called once in every 100ms
312+
}
313+
314+
func ExampleNewThrottleByWithCount() {
315+
throttle, reset := NewThrottleByWithCount(100*time.Millisecond, 2, func(key string) {
316+
fmt.Println(key, "Called once in every 100ms")
317+
})
318+
319+
for j := 0; j < 10; j++ {
320+
throttle("foo")
321+
throttle("bar")
322+
time.Sleep(30 * time.Millisecond)
323+
}
324+
325+
reset()
326+
327+
// Output:
328+
// foo Called once in every 100ms
329+
// bar Called once in every 100ms
330+
// foo Called once in every 100ms
331+
// bar Called once in every 100ms
332+
// foo Called once in every 100ms
333+
// bar Called once in every 100ms
334+
// foo Called once in every 100ms
335+
// bar Called once in every 100ms
336+
// foo Called once in every 100ms
337+
// bar Called once in every 100ms
338+
// foo Called once in every 100ms
339+
// bar Called once in every 100ms
340+
}

retry_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,3 +559,85 @@ func TestNewThrottleWithCount(t *testing.T) {
559559

560560
is.Equal(9, callCount)
561561
}
562+
563+
func TestNewThrottleBy(t *testing.T) {
564+
t.Parallel()
565+
is := assert.New(t)
566+
callCountA := 0
567+
callCountB := 0
568+
f1 := func(key string) {
569+
if key == "a" {
570+
callCountA++
571+
} else {
572+
callCountB++
573+
}
574+
}
575+
th, reset := NewThrottleBy[string](10*time.Millisecond, f1)
576+
577+
is.Equal(0, callCountA)
578+
is.Equal(0, callCountB)
579+
for j := 0; j < 100; j++ {
580+
th("a")
581+
th("b")
582+
}
583+
is.Equal(1, callCountA)
584+
is.Equal(1, callCountB)
585+
586+
time.Sleep(15 * time.Millisecond)
587+
588+
for j := 0; j < 100; j++ {
589+
th("a")
590+
th("b")
591+
}
592+
593+
is.Equal(2, callCountA)
594+
is.Equal(2, callCountB)
595+
596+
// reset counter
597+
reset()
598+
th("a")
599+
is.Equal(3, callCountA)
600+
is.Equal(2, callCountB)
601+
602+
}
603+
604+
func TestNewThrottleByWithCount(t *testing.T) {
605+
t.Parallel()
606+
is := assert.New(t)
607+
callCountA := 0
608+
callCountB := 0
609+
f1 := func(key string) {
610+
if key == "a" {
611+
callCountA++
612+
} else {
613+
callCountB++
614+
}
615+
}
616+
th, reset := NewThrottleByWithCount(10*time.Millisecond, 3, f1)
617+
618+
// the function does not throttle for initial count number
619+
for i := 0; i < 20; i++ {
620+
th("a")
621+
th("b")
622+
}
623+
is.Equal(3, callCountA)
624+
is.Equal(3, callCountB)
625+
626+
time.Sleep(11 * time.Millisecond)
627+
628+
for i := 0; i < 20; i++ {
629+
th("a")
630+
th("b")
631+
}
632+
633+
is.Equal(6, callCountA)
634+
is.Equal(6, callCountB)
635+
636+
reset()
637+
for i := 0; i < 20; i++ {
638+
th("a")
639+
}
640+
641+
is.Equal(9, callCountA)
642+
is.Equal(6, callCountB)
643+
}

0 commit comments

Comments
 (0)