diff options
-rw-r--r-- | plugin/pkg/up/up.go | 66 | ||||
-rw-r--r-- | plugin/pkg/up/up_test.go | 40 |
2 files changed, 106 insertions, 0 deletions
diff --git a/plugin/pkg/up/up.go b/plugin/pkg/up/up.go new file mode 100644 index 000000000..af8de5ed5 --- /dev/null +++ b/plugin/pkg/up/up.go @@ -0,0 +1,66 @@ +package up + +import ( + "sync" + "time" +) + +// Probe is used to run a single Func until it returns true (indicating a target is healthy). If an Func +// is already in progress no new one will be added, i.e. there is always a maximum of 1 checks in flight. +type Probe struct { + do chan Func + stop chan bool + + target string + + sync.Mutex + inprogress bool +} + +// Func is used to determine if a target is alive. If so this function must return true. +type Func func(target string) bool + +// New returns a pointer to an intialized Probe. +func New() *Probe { + return &Probe{stop: make(chan bool), do: make(chan Func)} +} + +// Do will probe target, if a probe is already in progress this is a noop. +func (p *Probe) Do(f Func) { p.do <- f } + +// Stop stops the probing. +func (p *Probe) Stop() { p.stop <- true } + +// Start will start the probe manager, after which probes can be initialized with Do. +func (p *Probe) Start(target string, interval time.Duration) { go p.start(target, interval) } + +func (p *Probe) start(target string, interval time.Duration) { + for { + select { + case <-p.stop: + return + case f := <-p.do: + p.Lock() + if p.inprogress { + p.Unlock() + continue + } + p.inprogress = true + p.Unlock() + + // Passed the lock. Now run f for as long it returns false. If a true is returned + // we return from the goroutine and we can accept another Func to run. + go func() { + for { + if ok := f(target); ok { + break + } + time.Sleep(interval) + } + p.Lock() + p.inprogress = false + p.Unlock() + }() + } + } +} diff --git a/plugin/pkg/up/up_test.go b/plugin/pkg/up/up_test.go new file mode 100644 index 000000000..cb56658d1 --- /dev/null +++ b/plugin/pkg/up/up_test.go @@ -0,0 +1,40 @@ +package up + +import ( + "sync" + "sync/atomic" + "testing" + "time" +) + +func TestUp(t *testing.T) { + pr := New() + wg := sync.WaitGroup{} + hits := int32(0) + + upfunc := func(s string) bool { + atomic.AddInt32(&hits, 1) + // Sleep tiny amount so that our other pr.Do() calls hit the lock. + time.Sleep(3 * time.Millisecond) + wg.Done() + return true + } + + pr.Start("nonexistent", 5*time.Millisecond) + defer pr.Stop() + + // These functions AddInt32 to the same hits variable, but we only want to wait when + // upfunc finishes, as that only calls Done() on the waitgroup. + upfuncNoWg := func(s string) bool { atomic.AddInt32(&hits, 1); return true } + wg.Add(1) + pr.Do(upfunc) + pr.Do(upfuncNoWg) + pr.Do(upfuncNoWg) + + wg.Wait() + + h := atomic.LoadInt32(&hits) + if h != 1 { + t.Errorf("Expected hits to be %d, got %d", 1, h) + } +} |