aboutsummaryrefslogtreecommitdiff
path: root/backend/internal/leader/election/election.go
diff options
context:
space:
mode:
Diffstat (limited to 'backend/internal/leader/election/election.go')
-rw-r--r--backend/internal/leader/election/election.go128
1 files changed, 128 insertions, 0 deletions
diff --git a/backend/internal/leader/election/election.go b/backend/internal/leader/election/election.go
new file mode 100644
index 0000000..6f83298
--- /dev/null
+++ b/backend/internal/leader/election/election.go
@@ -0,0 +1,128 @@
+package election
+
+import (
+ "context"
+ "errors"
+ "log/slog"
+ "time"
+
+ "github.com/bsm/redislock"
+)
+
+var defaultLeaderElectionOptions = leaderElectionOptions{
+ lockKey: "ibd-leader-election",
+ lockTTL: 10 * time.Second,
+}
+
+func RunOrDie(
+ ctx context.Context,
+ client redislock.RedisClient,
+ onLeader func(context.Context),
+ opts ...LeaderElectionOption,
+) {
+ o := defaultLeaderElectionOptions
+ for _, opt := range opts {
+ opt(&o)
+ }
+
+ locker := redislock.New(client)
+
+ // Election loop
+ for {
+ lock, err := locker.Obtain(ctx, o.lockKey, o.lockTTL, nil)
+ if errors.Is(err, redislock.ErrNotObtained) {
+ // Another instance is the leader
+ } else if err != nil {
+ slog.ErrorContext(ctx, "failed to obtain lock", "error", err)
+ } else {
+ // We are the leader
+ slog.DebugContext(ctx, "elected leader")
+ runLeader(ctx, lock, onLeader, o)
+ }
+
+ // Sleep for a bit before trying again
+ timer := time.NewTimer(o.lockTTL / 5)
+ select {
+ case <-ctx.Done():
+ if !timer.Stop() {
+ <-timer.C
+ }
+ return
+ case <-timer.C:
+ }
+ }
+}
+
+func runLeader(
+ ctx context.Context,
+ lock *redislock.Lock,
+ onLeader func(context.Context),
+ o leaderElectionOptions,
+) {
+ // A context that is canceled when the leader loses the lock
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ // Release the lock when done
+ defer func() {
+ // Create new context without cancel if the original context is already canceled
+ relCtx := ctx
+ if ctx.Err() != nil {
+ relCtx = context.WithoutCancel(ctx)
+ }
+
+ // Add a timeout to the release context
+ relCtx, cancel := context.WithTimeout(relCtx, o.lockTTL)
+ defer cancel()
+
+ if err := lock.Release(relCtx); err != nil {
+ slog.Error("failed to release lock", "error", err)
+ }
+ }()
+
+ // Run the leader code
+ go func(ctx context.Context) {
+ onLeader(ctx)
+
+ // If the leader code returns, cancel the context to release the lock
+ cancel()
+ }(ctx)
+
+ // Refresh the lock periodically
+ ticker := time.NewTicker(o.lockTTL / 10)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ticker.C:
+ err := lock.Refresh(ctx, o.lockTTL, nil)
+ if errors.Is(err, redislock.ErrNotObtained) || errors.Is(err, redislock.ErrLockNotHeld) {
+ slog.ErrorContext(ctx, "leadership lost", "error", err)
+ return
+ } else if err != nil {
+ slog.ErrorContext(ctx, "failed to refresh lock", "error", err)
+ }
+ case <-ctx.Done():
+ return
+ }
+ }
+}
+
+type leaderElectionOptions struct {
+ lockKey string
+ lockTTL time.Duration
+}
+
+type LeaderElectionOption func(*leaderElectionOptions)
+
+func WithLockKey(key string) LeaderElectionOption {
+ return func(o *leaderElectionOptions) {
+ o.lockKey = key
+ }
+}
+
+func WithLockTTL(ttl time.Duration) LeaderElectionOption {
+ return func(o *leaderElectionOptions) {
+ o.lockTTL = ttl
+ }
+}