1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
|
package manager
import (
"context"
"fmt"
"log/slog"
"time"
"github.com/buraksezer/consistent"
"github.com/cespare/xxhash/v2"
"github.com/redis/go-redis/v9"
)
const (
MonitorInterval = 5 * time.Second
ActiveWorkersSet = "active-workers"
)
// WorkerMonitor is a struct that monitors workers and their heartbeats over redis.
type WorkerMonitor struct {
client *redis.Client
// ring is a consistent hash ring that distributes partitions over detected workers.
ring *consistent.Consistent
// layoutChangeCh is a channel that others can listen to for layout changes to the ring.
layoutChangeCh chan struct{}
}
// NewWorkerMonitor creates a new WorkerMonitor.
func NewWorkerMonitor(client *redis.Client) *WorkerMonitor {
var members []consistent.Member
return &WorkerMonitor{
client: client,
ring: consistent.New(members, consistent.Config{
Hasher: new(hasher),
PartitionCount: consistent.DefaultPartitionCount,
ReplicationFactor: consistent.DefaultReplicationFactor,
Load: consistent.DefaultLoad,
}),
layoutChangeCh: make(chan struct{}),
}
}
func (m *WorkerMonitor) Close() error {
close(m.layoutChangeCh)
return nil
}
func (m *WorkerMonitor) Changes() <-chan struct{} {
return m.layoutChangeCh
}
func (m *WorkerMonitor) Start(ctx context.Context) {
m.monitorWorkers(ctx)
ticker := time.NewTicker(MonitorInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
m.monitorWorkers(ctx)
}
}
}
func (m *WorkerMonitor) monitorWorkers(ctx context.Context) {
ctx, cancel := context.WithTimeout(ctx, MonitorInterval)
defer cancel()
// Get all active workers.
workers, err := m.client.SMembers(ctx, ActiveWorkersSet).Result()
if err != nil {
slog.ErrorContext(ctx, "Unable to get active workers", "error", err)
return
}
// Get existing workers in the ring.
existingWorkers := m.ring.GetMembers()
ewMap := make(map[string]bool)
for _, worker := range existingWorkers {
ewMap[worker.String()] = false
}
// Check workers' heartbeats.
for _, worker := range workers {
exists, err := m.client.Exists(ctx, WorkerHeartbeatKey(worker)).Result()
if err != nil {
slog.ErrorContext(ctx, "Unable to check worker heartbeat", "worker", worker, "error", err)
continue
}
if exists == 0 {
slog.WarnContext(ctx, "Worker heartbeat not found", "worker", worker)
// Remove worker from active workers set.
if err = m.client.SRem(ctx, ActiveWorkersSet, worker).Err(); err != nil {
slog.ErrorContext(ctx, "Unable to remove worker from active workers set", "worker", worker, "error", err)
}
// Remove worker from the ring.
m.removeWorker(worker)
} else {
// Add worker to the ring if it doesn't exist.
if _, ok := ewMap[worker]; !ok {
slog.InfoContext(ctx, "New worker detected", "worker", worker)
m.addWorker(worker)
} else {
ewMap[worker] = true
}
}
}
// Check for workers that are not active anymore.
for worker, exists := range ewMap {
if !exists {
slog.WarnContext(ctx, "Worker is not active anymore", "worker", worker)
m.removeWorker(worker)
}
}
}
func (m *WorkerMonitor) addWorker(worker string) {
m.ring.Add(member{hostname: worker})
// Notify others about the layout change.
select {
case m.layoutChangeCh <- struct{}{}:
// Notify others.
default:
// No one is listening.
}
}
func (m *WorkerMonitor) removeWorker(worker string) {
m.ring.Remove(worker)
// Notify others about the layout change.
select {
case m.layoutChangeCh <- struct{}{}:
// Notify others.
default:
// No one is listening.
}
}
func WorkerHeartbeatKey(hostname string) string {
return fmt.Sprintf("worker:%s:heartbeat", hostname)
}
type hasher struct{}
func (h *hasher) Sum64(data []byte) uint64 {
return xxhash.Sum64(data)
}
type member struct {
hostname string
}
func (m member) String() string {
return m.hostname
}
|