package manager import ( "context" "fmt" "log/slog" "time" "github.com/buraksezer/consistent" "github.com/cespare/xxhash/v2" "github.com/redis/go-redis/v9" ) const ( MonitorInterval = 5 * time.Second ActiveWorkersSet = "active-workers" ) // WorkerMonitor is a struct that monitors workers and their heartbeats over redis. type WorkerMonitor struct { client *redis.Client // ring is a consistent hash ring that distributes partitions over detected workers. ring *consistent.Consistent // layoutChangeCh is a channel that others can listen to for layout changes to the ring. layoutChangeCh chan struct{} } // NewWorkerMonitor creates a new WorkerMonitor. func NewWorkerMonitor(client *redis.Client) *WorkerMonitor { var members []consistent.Member return &WorkerMonitor{ client: client, ring: consistent.New(members, consistent.Config{ Hasher: new(hasher), PartitionCount: consistent.DefaultPartitionCount, ReplicationFactor: consistent.DefaultReplicationFactor, Load: consistent.DefaultLoad, }), layoutChangeCh: make(chan struct{}), } } func (m *WorkerMonitor) Close() error { close(m.layoutChangeCh) return nil } func (m *WorkerMonitor) Changes() <-chan struct{} { return m.layoutChangeCh } func (m *WorkerMonitor) Start(ctx context.Context) { m.monitorWorkers(ctx) ticker := time.NewTicker(MonitorInterval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: m.monitorWorkers(ctx) } } } func (m *WorkerMonitor) monitorWorkers(ctx context.Context) { ctx, cancel := context.WithTimeout(ctx, MonitorInterval) defer cancel() // Get all active workers. workers, err := m.client.SMembers(ctx, ActiveWorkersSet).Result() if err != nil { slog.ErrorContext(ctx, "Unable to get active workers", "error", err) return } // Get existing workers in the ring. existingWorkers := m.ring.GetMembers() ewMap := make(map[string]bool) for _, worker := range existingWorkers { ewMap[worker.String()] = false } // Check workers' heartbeats. for _, worker := range workers { exists, err := m.client.Exists(ctx, WorkerHeartbeatKey(worker)).Result() if err != nil { slog.ErrorContext(ctx, "Unable to check worker heartbeat", "worker", worker, "error", err) continue } if exists == 0 { slog.WarnContext(ctx, "Worker heartbeat not found", "worker", worker) // Remove worker from active workers set. if err = m.client.SRem(ctx, ActiveWorkersSet, worker).Err(); err != nil { slog.ErrorContext(ctx, "Unable to remove worker from active workers set", "worker", worker, "error", err) } // Remove worker from the ring. m.removeWorker(worker) } else { // Add worker to the ring if it doesn't exist. if _, ok := ewMap[worker]; !ok { slog.InfoContext(ctx, "New worker detected", "worker", worker) m.addWorker(worker) } else { ewMap[worker] = true } } } // Check for workers that are not active anymore. for worker, exists := range ewMap { if !exists { slog.WarnContext(ctx, "Worker is not active anymore", "worker", worker) m.removeWorker(worker) } } } func (m *WorkerMonitor) addWorker(worker string) { m.ring.Add(member{hostname: worker}) // Notify others about the layout change. select { case m.layoutChangeCh <- struct{}{}: // Notify others. default: // No one is listening. } } func (m *WorkerMonitor) removeWorker(worker string) { m.ring.Remove(worker) // Notify others about the layout change. select { case m.layoutChangeCh <- struct{}{}: // Notify others. default: // No one is listening. } } func WorkerHeartbeatKey(hostname string) string { return fmt.Sprintf("worker:%s:heartbeat", hostname) } type hasher struct{} func (h *hasher) Sum64(data []byte) uint64 { return xxhash.Sum64(data) } type member struct { hostname string } func (m member) String() string { return m.hostname }