package election import ( "context" "errors" "log/slog" "time" "github.com/bsm/redislock" ) var defaultLeaderElectionOptions = leaderElectionOptions{ lockKey: "ibd-leader-election", lockTTL: 10 * time.Second, } func RunOrDie( ctx context.Context, client redislock.RedisClient, onLeader func(context.Context), opts ...LeaderElectionOption, ) { o := defaultLeaderElectionOptions for _, opt := range opts { opt(&o) } locker := redislock.New(client) // Election loop for { lock, err := locker.Obtain(ctx, o.lockKey, o.lockTTL, nil) if errors.Is(err, redislock.ErrNotObtained) { // Another instance is the leader } else if err != nil { slog.ErrorContext(ctx, "failed to obtain lock", "error", err) } else { // We are the leader slog.DebugContext(ctx, "elected leader") runLeader(ctx, lock, onLeader, o) } // Sleep for a bit before trying again timer := time.NewTimer(o.lockTTL / 5) select { case <-ctx.Done(): if !timer.Stop() { <-timer.C } return case <-timer.C: } } } func runLeader( ctx context.Context, lock *redislock.Lock, onLeader func(context.Context), o leaderElectionOptions, ) { // A context that is canceled when the leader loses the lock ctx, cancel := context.WithCancel(ctx) defer cancel() // Release the lock when done defer func() { // Create new context without cancel if the original context is already canceled relCtx := ctx if ctx.Err() != nil { relCtx = context.WithoutCancel(ctx) } // Add a timeout to the release context relCtx, cancel := context.WithTimeout(relCtx, o.lockTTL) defer cancel() if err := lock.Release(relCtx); err != nil { slog.Error("failed to release lock", "error", err) } }() // Run the leader code go func(ctx context.Context) { onLeader(ctx) // If the leader code returns, cancel the context to release the lock cancel() }(ctx) // Refresh the lock periodically ticker := time.NewTicker(o.lockTTL / 10) defer ticker.Stop() for { select { case <-ticker.C: err := lock.Refresh(ctx, o.lockTTL, nil) if errors.Is(err, redislock.ErrNotObtained) || errors.Is(err, redislock.ErrLockNotHeld) { slog.ErrorContext(ctx, "leadership lost", "error", err) return } else if err != nil { slog.ErrorContext(ctx, "failed to refresh lock", "error", err) } case <-ctx.Done(): return } } } type leaderElectionOptions struct { lockKey string lockTTL time.Duration } type LeaderElectionOption func(*leaderElectionOptions) func WithLockKey(key string) LeaderElectionOption { return func(o *leaderElectionOptions) { o.lockKey = key } } func WithLockTTL(ttl time.Duration) LeaderElectionOption { return func(o *leaderElectionOptions) { o.lockTTL = ttl } }