diff options
Diffstat (limited to 'backend/internal/leader/election/election.go')
-rw-r--r-- | backend/internal/leader/election/election.go | 128 |
1 files changed, 128 insertions, 0 deletions
diff --git a/backend/internal/leader/election/election.go b/backend/internal/leader/election/election.go new file mode 100644 index 0000000..6f83298 --- /dev/null +++ b/backend/internal/leader/election/election.go @@ -0,0 +1,128 @@ +package election + +import ( + "context" + "errors" + "log/slog" + "time" + + "github.com/bsm/redislock" +) + +var defaultLeaderElectionOptions = leaderElectionOptions{ + lockKey: "ibd-leader-election", + lockTTL: 10 * time.Second, +} + +func RunOrDie( + ctx context.Context, + client redislock.RedisClient, + onLeader func(context.Context), + opts ...LeaderElectionOption, +) { + o := defaultLeaderElectionOptions + for _, opt := range opts { + opt(&o) + } + + locker := redislock.New(client) + + // Election loop + for { + lock, err := locker.Obtain(ctx, o.lockKey, o.lockTTL, nil) + if errors.Is(err, redislock.ErrNotObtained) { + // Another instance is the leader + } else if err != nil { + slog.ErrorContext(ctx, "failed to obtain lock", "error", err) + } else { + // We are the leader + slog.DebugContext(ctx, "elected leader") + runLeader(ctx, lock, onLeader, o) + } + + // Sleep for a bit before trying again + timer := time.NewTimer(o.lockTTL / 5) + select { + case <-ctx.Done(): + if !timer.Stop() { + <-timer.C + } + return + case <-timer.C: + } + } +} + +func runLeader( + ctx context.Context, + lock *redislock.Lock, + onLeader func(context.Context), + o leaderElectionOptions, +) { + // A context that is canceled when the leader loses the lock + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // Release the lock when done + defer func() { + // Create new context without cancel if the original context is already canceled + relCtx := ctx + if ctx.Err() != nil { + relCtx = context.WithoutCancel(ctx) + } + + // Add a timeout to the release context + relCtx, cancel := context.WithTimeout(relCtx, o.lockTTL) + defer cancel() + + if err := lock.Release(relCtx); err != nil { + slog.Error("failed to release lock", "error", err) + } + }() + + // Run the leader code + go func(ctx context.Context) { + onLeader(ctx) + + // If the leader code returns, cancel the context to release the lock + cancel() + }(ctx) + + // Refresh the lock periodically + ticker := time.NewTicker(o.lockTTL / 10) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + err := lock.Refresh(ctx, o.lockTTL, nil) + if errors.Is(err, redislock.ErrNotObtained) || errors.Is(err, redislock.ErrLockNotHeld) { + slog.ErrorContext(ctx, "leadership lost", "error", err) + return + } else if err != nil { + slog.ErrorContext(ctx, "failed to refresh lock", "error", err) + } + case <-ctx.Done(): + return + } + } +} + +type leaderElectionOptions struct { + lockKey string + lockTTL time.Duration +} + +type LeaderElectionOption func(*leaderElectionOptions) + +func WithLockKey(key string) LeaderElectionOption { + return func(o *leaderElectionOptions) { + o.lockKey = key + } +} + +func WithLockTTL(ttl time.Duration) LeaderElectionOption { + return func(o *leaderElectionOptions) { + o.lockTTL = ttl + } +} |