aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--plugin/pkg/up/up.go66
-rw-r--r--plugin/pkg/up/up_test.go40
2 files changed, 106 insertions, 0 deletions
diff --git a/plugin/pkg/up/up.go b/plugin/pkg/up/up.go
new file mode 100644
index 000000000..af8de5ed5
--- /dev/null
+++ b/plugin/pkg/up/up.go
@@ -0,0 +1,66 @@
+package up
+
+import (
+ "sync"
+ "time"
+)
+
+// Probe is used to run a single Func until it returns true (indicating a target is healthy). If an Func
+// is already in progress no new one will be added, i.e. there is always a maximum of 1 checks in flight.
+type Probe struct {
+ do chan Func
+ stop chan bool
+
+ target string
+
+ sync.Mutex
+ inprogress bool
+}
+
+// Func is used to determine if a target is alive. If so this function must return true.
+type Func func(target string) bool
+
+// New returns a pointer to an intialized Probe.
+func New() *Probe {
+ return &Probe{stop: make(chan bool), do: make(chan Func)}
+}
+
+// Do will probe target, if a probe is already in progress this is a noop.
+func (p *Probe) Do(f Func) { p.do <- f }
+
+// Stop stops the probing.
+func (p *Probe) Stop() { p.stop <- true }
+
+// Start will start the probe manager, after which probes can be initialized with Do.
+func (p *Probe) Start(target string, interval time.Duration) { go p.start(target, interval) }
+
+func (p *Probe) start(target string, interval time.Duration) {
+ for {
+ select {
+ case <-p.stop:
+ return
+ case f := <-p.do:
+ p.Lock()
+ if p.inprogress {
+ p.Unlock()
+ continue
+ }
+ p.inprogress = true
+ p.Unlock()
+
+ // Passed the lock. Now run f for as long it returns false. If a true is returned
+ // we return from the goroutine and we can accept another Func to run.
+ go func() {
+ for {
+ if ok := f(target); ok {
+ break
+ }
+ time.Sleep(interval)
+ }
+ p.Lock()
+ p.inprogress = false
+ p.Unlock()
+ }()
+ }
+ }
+}
diff --git a/plugin/pkg/up/up_test.go b/plugin/pkg/up/up_test.go
new file mode 100644
index 000000000..cb56658d1
--- /dev/null
+++ b/plugin/pkg/up/up_test.go
@@ -0,0 +1,40 @@
+package up
+
+import (
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+)
+
+func TestUp(t *testing.T) {
+ pr := New()
+ wg := sync.WaitGroup{}
+ hits := int32(0)
+
+ upfunc := func(s string) bool {
+ atomic.AddInt32(&hits, 1)
+ // Sleep tiny amount so that our other pr.Do() calls hit the lock.
+ time.Sleep(3 * time.Millisecond)
+ wg.Done()
+ return true
+ }
+
+ pr.Start("nonexistent", 5*time.Millisecond)
+ defer pr.Stop()
+
+ // These functions AddInt32 to the same hits variable, but we only want to wait when
+ // upfunc finishes, as that only calls Done() on the waitgroup.
+ upfuncNoWg := func(s string) bool { atomic.AddInt32(&hits, 1); return true }
+ wg.Add(1)
+ pr.Do(upfunc)
+ pr.Do(upfuncNoWg)
+ pr.Do(upfuncNoWg)
+
+ wg.Wait()
+
+ h := atomic.LoadInt32(&hits)
+ if h != 1 {
+ t.Errorf("Expected hits to be %d, got %d", 1, h)
+ }
+}