package election
import (
"context"
"errors"
"log/slog"
"time"
"github.com/bsm/redislock"
)
var defaultLeaderElectionOptions = leaderElectionOptions{
lockKey: "ibd-leader-election",
lockTTL: 10 * time.Second,
}
func RunOrDie(
ctx context.Context,
client redislock.RedisClient,
onLeader func(context.Context),
opts ...LeaderElectionOption,
) {
o := defaultLeaderElectionOptions
for _, opt := range opts {
opt(&o)
}
locker := redislock.New(client)
// Election loop
for {
lock, err := locker.Obtain(ctx, o.lockKey, o.lockTTL, nil)
if errors.Is(err, redislock.ErrNotObtained) {
// Another instance is the leader
} else if err != nil {
slog.ErrorContext(ctx, "failed to obtain lock", "error", err)
} else {
// We are the leader
slog.DebugContext(ctx, "elected leader")
runLeader(ctx, lock, onLeader, o)
}
// Sleep for a bit before trying again
timer := time.NewTimer(o.lockTTL / 5)
select {
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
}
return
case <-timer.C:
}
}
}
func runLeader(
ctx context.Context,
lock *redislock.Lock,
onLeader func(context.Context),
o leaderElectionOptions,
) {
// A context that is canceled when the leader loses the lock
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Release the lock when done
defer func() {
// Create new context without cancel if the original context is already canceled
relCtx := ctx
if ctx.Err() != nil {
relCtx = context.WithoutCancel(ctx)
}
// Add a timeout to the release context
relCtx, cancel := context.WithTimeout(relCtx, o.lockTTL)
defer cancel()
if err := lock.Release(relCtx); err != nil {
slog.Error("failed to release lock", "error", err)
}
}()
// Run the leader code
go func(ctx context.Context) {
onLeader(ctx)
// If the leader code returns, cancel the context to release the lock
cancel()
}(ctx)
// Refresh the lock periodically
ticker := time.NewTicker(o.lockTTL / 10)
defer ticker.Stop()
for {
select {
case <-ticker.C:
err := lock.Refresh(ctx, o.lockTTL, nil)
if errors.Is(err, redislock.ErrNotObtained) || errors.Is(err, redislock.ErrLockNotHeld) {
slog.ErrorContext(ctx, "leadership lost", "error", err)
return
} else if err != nil {
slog.ErrorContext(ctx, "failed to refresh lock", "error", err)
}
case <-ctx.Done():
return
}
}
}
type leaderElectionOptions struct {
lockKey string
lockTTL time.Duration
}
type LeaderElectionOption func(*leaderElectionOptions)
func WithLockKey(key string) LeaderElectionOption {
return func(o *leaderElectionOptions) {
o.lockKey = key
}
}
func WithLockTTL(ttl time.Duration) LeaderElectionOption {
return func(o *leaderElectionOptions) {
o.lockTTL = ttl
}
}
n value='db22'>db22
Unnamed repository; edit this file 'description' to name the repository. | |