1
package waitgroup
2

3
import (
4
	"context"
5
	"sync"
6
	"time"
7
)
8

9
// WaitGroup waits for other same processes based key with timeout.
10
type WaitGroup struct {
11
	mu sync.RWMutex
12

13
	groups map[uint64]*call
14

15
	timeout time.Duration
16
}
17

18
type call struct {
19
	ctx    context.Context
20
	dups   int
21
	cancel func()
22
}
23

24
// New return a new WaitGroup with timeout.
25
func New(timeout time.Duration) *WaitGroup {
26 1
	return &WaitGroup{
27 1
		groups: make(map[uint64]*call),
28

29 1
		timeout: timeout,
30
	}
31
}
32

33
// Get return count of dups with key.
34
func (wg *WaitGroup) Get(key uint64) int {
35 1
	wg.mu.RLock()
36 1
	defer wg.mu.RUnlock()
37

38 1
	if c, ok := wg.groups[key]; ok {
39 1
		return c.dups
40
	}
41

42 1
	return 0
43
}
44

45
// Wait blocks until WaitGroup context cancelled or timedout with key.
46
func (wg *WaitGroup) Wait(key uint64) {
47 1
	wg.mu.RLock()
48

49 1
	if c, ok := wg.groups[key]; ok {
50 1
		wg.mu.RUnlock()
51 1
		<-c.ctx.Done()
52 1
		return
53
	}
54

55 1
	wg.mu.RUnlock()
56
}
57

58
// Add adds a new caller or if the caller exists increment dups with key.
59
func (wg *WaitGroup) Add(key uint64) {
60 1
	wg.mu.Lock()
61 1
	defer wg.mu.Unlock()
62

63 1
	if c, ok := wg.groups[key]; ok {
64 0
		c.dups++
65 0
		return
66
	}
67

68 1
	c := new(call)
69 1
	c.dups++
70 1
	c.ctx, c.cancel = context.WithTimeout(context.Background(), wg.timeout)
71 1
	wg.groups[key] = c
72
}
73

74
// Done cancels the group context or if the caller dups more then zero, decrements the dups with key.
75
func (wg *WaitGroup) Done(key uint64) {
76 1
	wg.mu.Lock()
77 1
	defer wg.mu.Unlock()
78

79 1
	if c, ok := wg.groups[key]; ok {
80 1
		if c.dups > 1 {
81 0
			c.dups--
82 0
			return
83
		}
84 1
		c.cancel()
85
	}
86

87 1
	delete(wg.groups, key)
88
}

Read our documentation on viewing source code .

Loading