diff options
Diffstat (limited to 'middleware/pkg')
-rw-r--r-- | middleware/pkg/cache/cache.go | 129 | ||||
-rw-r--r-- | middleware/pkg/cache/cache_test.go | 31 | ||||
-rw-r--r-- | middleware/pkg/cache/shard_test.go | 60 | ||||
-rw-r--r-- | middleware/pkg/singleflight/singleflight.go | 6 | ||||
-rw-r--r-- | middleware/pkg/singleflight/singleflight_test.go | 6 |
5 files changed, 226 insertions, 6 deletions
diff --git a/middleware/pkg/cache/cache.go b/middleware/pkg/cache/cache.go new file mode 100644 index 000000000..56cae2180 --- /dev/null +++ b/middleware/pkg/cache/cache.go @@ -0,0 +1,129 @@ +// Package cache implements a cache. The cache hold 256 shards, each shard +// holds a cache: a map with a mutex. There is no fancy expunge algorithm, it +// just randomly evicts elements when it gets full. +package cache + +import ( + "hash/fnv" + "sync" +) + +// Hash returns the FNV hash of what. +func Hash(what []byte) uint32 { + h := fnv.New32() + h.Write(what) + return h.Sum32() +} + +// Cache is cache. +type Cache struct { + shards [shardSize]*shard +} + +// shard is a cache with random eviction. +type shard struct { + items map[uint32]interface{} + size int + + sync.RWMutex +} + +// New returns a new cache. +func New(size int) *Cache { + ssize := size / shardSize + if ssize < 512 { + ssize = 512 + } + + c := &Cache{} + + // Initialize all the shards + for i := 0; i < shardSize; i++ { + c.shards[i] = newShard(ssize) + } + return c +} + +// Add adds a new element to the cache. If the element already exists it is overwritten. +func (c *Cache) Add(key uint32, el interface{}) { + shard := key & (shardSize - 1) + c.shards[shard].Add(key, el) +} + +// Get looks up element index under key. +func (c *Cache) Get(key uint32) (interface{}, bool) { + shard := key & (shardSize - 1) + return c.shards[shard].Get(key) +} + +// Remove removes the element indexed with key. +func (c *Cache) Remove(key uint32) { + shard := key & (shardSize - 1) + c.shards[shard].Remove(key) +} + +// Len returns the number of elements in the cache. +func (c *Cache) Len() int { + l := 0 + for _, s := range c.shards { + l += s.Len() + } + return l +} + +// newShard returns a new shard with size. +func newShard(size int) *shard { return &shard{items: make(map[uint32]interface{}), size: size} } + +// Add adds element indexed by key into the cache. Any existing element is overwritten +func (s *shard) Add(key uint32, el interface{}) { + l := s.Len() + if l+1 > s.size { + s.Evict() + } + + s.Lock() + s.items[key] = el + s.Unlock() +} + +// Remove removes the element indexed by key from the cache. +func (s *shard) Remove(key uint32) { + s.Lock() + delete(s.items, key) + s.Unlock() +} + +// Evict removes a random element from the cache. +func (s *shard) Evict() { + s.Lock() + defer s.Unlock() + + key := -1 + for k := range s.items { + key = int(k) + break + } + if key == -1 { + // empty cache + return + } + delete(s.items, uint32(key)) +} + +// Get looks up the element indexed under key. +func (s *shard) Get(key uint32) (interface{}, bool) { + s.RLock() + el, found := s.items[key] + s.RUnlock() + return el, found +} + +// Len returns the current length of the cache. +func (s *shard) Len() int { + s.RLock() + l := len(s.items) + s.RUnlock() + return l +} + +const shardSize = 256 diff --git a/middleware/pkg/cache/cache_test.go b/middleware/pkg/cache/cache_test.go new file mode 100644 index 000000000..2c92bf438 --- /dev/null +++ b/middleware/pkg/cache/cache_test.go @@ -0,0 +1,31 @@ +package cache + +import "testing" + +func TestCacheAddAndGet(t *testing.T) { + c := New(4) + c.Add(1, 1) + + if _, found := c.Get(1); !found { + t.Fatal("Failed to find inserted record") + } +} + +func TestCacheLen(t *testing.T) { + c := New(4) + + c.Add(1, 1) + if l := c.Len(); l != 1 { + t.Fatalf("Cache size should %d, got %d", 1, l) + } + + c.Add(1, 1) + if l := c.Len(); l != 1 { + t.Fatalf("Cache size should %d, got %d", 1, l) + } + + c.Add(2, 2) + if l := c.Len(); l != 2 { + t.Fatalf("Cache size should %d, got %d", 2, l) + } +} diff --git a/middleware/pkg/cache/shard_test.go b/middleware/pkg/cache/shard_test.go new file mode 100644 index 000000000..26675cee1 --- /dev/null +++ b/middleware/pkg/cache/shard_test.go @@ -0,0 +1,60 @@ +package cache + +import "testing" + +func TestShardAddAndGet(t *testing.T) { + s := newShard(4) + s.Add(1, 1) + + if _, found := s.Get(1); !found { + t.Fatal("Failed to find inserted record") + } +} + +func TestShardLen(t *testing.T) { + s := newShard(4) + + s.Add(1, 1) + if l := s.Len(); l != 1 { + t.Fatalf("Shard size should %d, got %d", 1, l) + } + + s.Add(1, 1) + if l := s.Len(); l != 1 { + t.Fatalf("Shard size should %d, got %d", 1, l) + } + + s.Add(2, 2) + if l := s.Len(); l != 2 { + t.Fatalf("Shard size should %d, got %d", 2, l) + } +} + +func TestShardEvict(t *testing.T) { + s := newShard(1) + s.Add(1, 1) + s.Add(2, 2) + // 1 should be gone + + if _, found := s.Get(1); found { + t.Fatal("Found item that should have been evicted") + } +} + +func TestShardLenEvict(t *testing.T) { + s := newShard(4) + s.Add(1, 1) + s.Add(2, 1) + s.Add(3, 1) + s.Add(4, 1) + + if l := s.Len(); l != 4 { + t.Fatalf("Shard size should %d, got %d", 4, l) + } + + // This should evict one element + s.Add(5, 1) + if l := s.Len(); l != 4 { + t.Fatalf("Shard size should %d, got %d", 4, l) + } +} diff --git a/middleware/pkg/singleflight/singleflight.go b/middleware/pkg/singleflight/singleflight.go index ff2c2ee4f..365e3ef58 100644 --- a/middleware/pkg/singleflight/singleflight.go +++ b/middleware/pkg/singleflight/singleflight.go @@ -31,17 +31,17 @@ type call struct { // units of work can be executed with duplicate suppression. type Group struct { mu sync.Mutex // protects m - m map[string]*call // lazily initialized + m map[uint32]*call // lazily initialized } // Do executes and returns the results of the given function, making // sure that only one execution is in-flight for a given key at a // time. If a duplicate comes in, the duplicate caller waits for the // original to complete and receives the same results. -func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) { +func (g *Group) Do(key uint32, fn func() (interface{}, error)) (interface{}, error) { g.mu.Lock() if g.m == nil { - g.m = make(map[string]*call) + g.m = make(map[uint32]*call) } if c, ok := g.m[key]; ok { g.mu.Unlock() diff --git a/middleware/pkg/singleflight/singleflight_test.go b/middleware/pkg/singleflight/singleflight_test.go index 47b4d3dc0..d1d406e0b 100644 --- a/middleware/pkg/singleflight/singleflight_test.go +++ b/middleware/pkg/singleflight/singleflight_test.go @@ -27,7 +27,7 @@ import ( func TestDo(t *testing.T) { var g Group - v, err := g.Do("key", func() (interface{}, error) { + v, err := g.Do(1, func() (interface{}, error) { return "bar", nil }) if got, want := fmt.Sprintf("%v (%T)", v, v), "bar (string)"; got != want { @@ -41,7 +41,7 @@ func TestDo(t *testing.T) { func TestDoErr(t *testing.T) { var g Group someErr := errors.New("Some error") - v, err := g.Do("key", func() (interface{}, error) { + v, err := g.Do(1, func() (interface{}, error) { return nil, someErr }) if err != someErr { @@ -66,7 +66,7 @@ func TestDoDupSuppress(t *testing.T) { for i := 0; i < n; i++ { wg.Add(1) go func() { - v, err := g.Do("key", fn) + v, err := g.Do(1, fn) if err != nil { t.Errorf("Do error: %v", err) } |