1
package elemental
2

3
import (
4
	"context"
5
	"sync"
6
)
7

8
// AtomicJob takes a func() error and returns a func(context.Context) error.
9
// The returned function can be called as many time as you like, but only
10
// one instance of the given job can be run at the same time.
11
//
12
// The returned function will either execute job if it
13
// it not already running or wait for the currently running job to finish.
14
// In both cases, the returned error from the job will be forwareded and returned
15
// to every caller.
16
//
17
// You must pass a context.Context to the returned function so you can
18
// control how much time you are willing to wait for the job to complete.
19
//
20
// If you wish to change some external state from within the job function,
21
// it is your responsibility to ensure everything is thread safe.
22
func AtomicJob(job func() error) func(context.Context) error {
23

24 14
	var l sync.RWMutex
25 14
	var errorChs []chan error
26

27 14
	sem := make(chan struct{}, 1)
28

29 14
	return func(ctx context.Context) error {
30

31 14
		errCh := make(chan error)
32

33 14
		l.Lock()
34 14
		errorChs = append(errorChs, errCh)
35 14
		l.Unlock()
36

37 14
		select {
38 14
		case sem <- struct{}{}:
39

40 14
			go func() {
41

42 14
				err := job()
43

44 14
				l.Lock()
45
				for _, ch := range errorChs {
46 14
					select {
47 14
					case ch <- err:
48 14
					default:
49
					}
50
				}
51 14
				errorChs = nil
52 14
				l.Unlock()
53

54 14
				<-sem
55
			}()
56

57 14
		default:
58
		}
59

60 14
		select {
61 14
		case err := <-errCh:
62 14
			return err
63 14
		case <-ctx.Done():
64 14
			return ctx.Err()
65
		}
66
	}
67
}

Read our documentation on viewing source code .

Loading