diff options
Diffstat (limited to 'backend/internal/leader')
-rw-r--r-- | backend/internal/leader/election/election.go | 128 | ||||
-rw-r--r-- | backend/internal/leader/manager/ibd/auth/auth.go | 111 | ||||
-rw-r--r-- | backend/internal/leader/manager/ibd/ibd.go | 8 | ||||
-rw-r--r-- | backend/internal/leader/manager/ibd/scrape/scrape.go | 140 | ||||
-rw-r--r-- | backend/internal/leader/manager/manager.go | 90 | ||||
-rw-r--r-- | backend/internal/leader/manager/monitor.go | 164 |
6 files changed, 641 insertions, 0 deletions
diff --git a/backend/internal/leader/election/election.go b/backend/internal/leader/election/election.go new file mode 100644 index 0000000..6f83298 --- /dev/null +++ b/backend/internal/leader/election/election.go @@ -0,0 +1,128 @@ +package election + +import ( + "context" + "errors" + "log/slog" + "time" + + "github.com/bsm/redislock" +) + +var defaultLeaderElectionOptions = leaderElectionOptions{ + lockKey: "ibd-leader-election", + lockTTL: 10 * time.Second, +} + +func RunOrDie( + ctx context.Context, + client redislock.RedisClient, + onLeader func(context.Context), + opts ...LeaderElectionOption, +) { + o := defaultLeaderElectionOptions + for _, opt := range opts { + opt(&o) + } + + locker := redislock.New(client) + + // Election loop + for { + lock, err := locker.Obtain(ctx, o.lockKey, o.lockTTL, nil) + if errors.Is(err, redislock.ErrNotObtained) { + // Another instance is the leader + } else if err != nil { + slog.ErrorContext(ctx, "failed to obtain lock", "error", err) + } else { + // We are the leader + slog.DebugContext(ctx, "elected leader") + runLeader(ctx, lock, onLeader, o) + } + + // Sleep for a bit before trying again + timer := time.NewTimer(o.lockTTL / 5) + select { + case <-ctx.Done(): + if !timer.Stop() { + <-timer.C + } + return + case <-timer.C: + } + } +} + +func runLeader( + ctx context.Context, + lock *redislock.Lock, + onLeader func(context.Context), + o leaderElectionOptions, +) { + // A context that is canceled when the leader loses the lock + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // Release the lock when done + defer func() { + // Create new context without cancel if the original context is already canceled + relCtx := ctx + if ctx.Err() != nil { + relCtx = context.WithoutCancel(ctx) + } + + // Add a timeout to the release context + relCtx, cancel := context.WithTimeout(relCtx, o.lockTTL) + defer cancel() + + if err := lock.Release(relCtx); err != nil { + slog.Error("failed to release lock", "error", err) + } + }() + + // Run the leader code + go func(ctx context.Context) { + onLeader(ctx) + + // If the leader code returns, cancel the context to release the lock + cancel() + }(ctx) + + // Refresh the lock periodically + ticker := time.NewTicker(o.lockTTL / 10) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + err := lock.Refresh(ctx, o.lockTTL, nil) + if errors.Is(err, redislock.ErrNotObtained) || errors.Is(err, redislock.ErrLockNotHeld) { + slog.ErrorContext(ctx, "leadership lost", "error", err) + return + } else if err != nil { + slog.ErrorContext(ctx, "failed to refresh lock", "error", err) + } + case <-ctx.Done(): + return + } + } +} + +type leaderElectionOptions struct { + lockKey string + lockTTL time.Duration +} + +type LeaderElectionOption func(*leaderElectionOptions) + +func WithLockKey(key string) LeaderElectionOption { + return func(o *leaderElectionOptions) { + o.lockKey = key + } +} + +func WithLockTTL(ttl time.Duration) LeaderElectionOption { + return func(o *leaderElectionOptions) { + o.lockTTL = ttl + } +} diff --git a/backend/internal/leader/manager/ibd/auth/auth.go b/backend/internal/leader/manager/ibd/auth/auth.go new file mode 100644 index 0000000..9b5502d --- /dev/null +++ b/backend/internal/leader/manager/ibd/auth/auth.go @@ -0,0 +1,111 @@ +package auth + +import ( + "context" + "log/slog" + "time" + + "github.com/ansg191/ibd-trader-backend/internal/database" + "github.com/ansg191/ibd-trader-backend/internal/redis/taskqueue" + + "github.com/redis/go-redis/v9" + "github.com/robfig/cron/v3" +) + +const ( + Queue = "auth-queue" + QueueEncoding = taskqueue.EncodingJSON +) + +// Manager is responsible for sending authentication tasks to the workers. +type Manager struct { + queue taskqueue.TaskQueue[TaskInfo] + db database.Executor + schedule cron.Schedule +} + +func New( + ctx context.Context, + db database.Executor, + rClient *redis.Client, + schedule cron.Schedule, +) (*Manager, error) { + queue, err := taskqueue.New( + ctx, + rClient, + Queue, + "auth-manager", + taskqueue.WithEncoding[TaskInfo](QueueEncoding), + ) + if err != nil { + return nil, err + } + + return &Manager{ + queue: queue, + db: db, + schedule: schedule, + }, nil +} + +func (m *Manager) Run(ctx context.Context) { + for { + now := time.Now() + // Find the next time + nextTime := m.schedule.Next(now) + if nextTime.IsZero() { + // Sleep until the next day + time.Sleep(time.Until(now.AddDate(0, 0, 1))) + continue + } + + timer := time.NewTimer(nextTime.Sub(now)) + slog.DebugContext(ctx, "waiting for next Auth scrape", "next_exec", nextTime) + + select { + case <-timer.C: + nextExec := m.schedule.Next(nextTime) + m.scrapeCookies(ctx, nextExec) + case <-ctx.Done(): + if !timer.Stop() { + <-timer.C + } + return + } + } +} + +// scrapeCookies scrapes the cookies for every user from the IBD website. +// +// This iterates through all users with IBD credentials and checks whether their cookies are still valid. +// If the cookies are invalid or missing, it re-authenticates the user and updates the cookies in the database. +func (m *Manager) scrapeCookies(ctx context.Context, deadline time.Time) { + ctx, cancel := context.WithDeadline(ctx, deadline) + defer cancel() + + // Get all users with IBD credentials + users, err := database.ListUsers(ctx, m.db, true) + if err != nil { + slog.ErrorContext(ctx, "failed to get users", "error", err) + return + } + + // Create a new task for each user + for _, user := range users { + task := TaskInfo{ + UserSubject: user.Subject, + } + + // Enqueue the task + _, err := m.queue.Enqueue(ctx, task) + if err != nil { + slog.ErrorContext(ctx, "failed to enqueue task", "error", err) + } + } + + slog.InfoContext(ctx, "enqueued tasks for all users") +} + +type TaskInfo struct { + UserSubject string `json:"user_subject"` +} diff --git a/backend/internal/leader/manager/ibd/ibd.go b/backend/internal/leader/manager/ibd/ibd.go new file mode 100644 index 0000000..e2d4fc0 --- /dev/null +++ b/backend/internal/leader/manager/ibd/ibd.go @@ -0,0 +1,8 @@ +package ibd + +type Schedules struct { + // Auth schedule + Auth string + // IBD50 schedule + IBD50 string +} diff --git a/backend/internal/leader/manager/ibd/scrape/scrape.go b/backend/internal/leader/manager/ibd/scrape/scrape.go new file mode 100644 index 0000000..870ce5e --- /dev/null +++ b/backend/internal/leader/manager/ibd/scrape/scrape.go @@ -0,0 +1,140 @@ +package scrape + +import ( + "context" + "errors" + "log/slog" + "time" + + "github.com/ansg191/ibd-trader-backend/internal/database" + "github.com/ansg191/ibd-trader-backend/internal/ibd" + "github.com/ansg191/ibd-trader-backend/internal/redis/taskqueue" + + "github.com/redis/go-redis/v9" + "github.com/robfig/cron/v3" +) + +const ( + Queue = "scrape-queue" + QueueEncoding = taskqueue.EncodingJSON + Channel = "scrape-channel" +) + +// Manager is responsible for sending scraping tasks to the workers. +type Manager struct { + client *ibd.Client + db database.Executor + queue taskqueue.TaskQueue[TaskInfo] + schedule cron.Schedule + pubsub *redis.PubSub +} + +func New( + ctx context.Context, + client *ibd.Client, + db database.Executor, + redis *redis.Client, + schedule cron.Schedule, +) (*Manager, error) { + queue, err := taskqueue.New( + ctx, + redis, + Queue, + "ibd-manager", + taskqueue.WithEncoding[TaskInfo](QueueEncoding), + ) + if err != nil { + return nil, err + } + + return &Manager{ + client: client, + db: db, + queue: queue, + schedule: schedule, + pubsub: redis.Subscribe(ctx, Channel), + }, nil +} + +func (m *Manager) Close() error { + return m.pubsub.Close() +} + +func (m *Manager) Run(ctx context.Context) { + ch := m.pubsub.Channel() + for { + now := time.Now() + // Find the next time + nextTime := m.schedule.Next(now) + if nextTime.IsZero() { + // Sleep until the next day + time.Sleep(time.Until(now.AddDate(0, 0, 1))) + continue + } + + timer := time.NewTimer(nextTime.Sub(now)) + slog.DebugContext(ctx, "waiting for next IBD50 scrape", "next_exec", nextTime) + + select { + case <-timer.C: + nextExec := m.schedule.Next(nextTime) + m.scrapeIBD50(ctx, nextExec) + case <-ch: + nextExec := m.schedule.Next(time.Now()) + m.scrapeIBD50(ctx, nextExec) + case <-ctx.Done(): + if !timer.Stop() { + <-timer.C + } + return + } + } +} + +func (m *Manager) scrapeIBD50(ctx context.Context, deadline time.Time) { + ctx, cancel := context.WithDeadline(ctx, deadline) + defer cancel() + + stocks, err := m.client.GetIBD50(ctx) + if err != nil { + if errors.Is(err, ibd.ErrNoAvailableCookies) { + slog.WarnContext(ctx, "no available cookies", "error", err) + return + } + slog.ErrorContext(ctx, "failed to get IBD50", "error", err) + return + } + + for _, stock := range stocks { + // Add stock to DB + err = database.AddStock(ctx, m.db, database.Stock{ + Symbol: stock.Symbol, + Name: stock.Name, + IBDUrl: stock.QuoteURL.String(), + }) + if err != nil { + slog.ErrorContext(ctx, "failed to add stock", "error", err) + continue + } + + // Add ranking to Db + err = database.AddRanking(ctx, m.db, stock.Symbol, int(stock.Rank), 0) + if err != nil { + slog.ErrorContext(ctx, "failed to add ranking", "error", err) + continue + } + + // Add scrape task to queue + task := TaskInfo{Symbol: stock.Symbol} + taskID, err := m.queue.Enqueue(ctx, task) + if err != nil { + slog.ErrorContext(ctx, "failed to enqueue task", "error", err) + } + + slog.DebugContext(ctx, "enqueued scrape task", "task_id", taskID, "symbol", stock.Symbol) + } +} + +type TaskInfo struct { + Symbol string `json:"symbol"` +} diff --git a/backend/internal/leader/manager/manager.go b/backend/internal/leader/manager/manager.go new file mode 100644 index 0000000..61e27e0 --- /dev/null +++ b/backend/internal/leader/manager/manager.go @@ -0,0 +1,90 @@ +package manager + +import ( + "context" + "fmt" + "log/slog" + "sync" + + "github.com/ansg191/ibd-trader-backend/internal/config" + "github.com/ansg191/ibd-trader-backend/internal/database" + "github.com/ansg191/ibd-trader-backend/internal/ibd" + "github.com/ansg191/ibd-trader-backend/internal/leader/manager/ibd/auth" + ibd2 "github.com/ansg191/ibd-trader-backend/internal/leader/manager/ibd/scrape" + + "github.com/redis/go-redis/v9" + "github.com/robfig/cron/v3" +) + +type Manager struct { + db database.Database + Monitor *WorkerMonitor + Scraper *ibd2.Manager + Auth *auth.Manager +} + +func New( + ctx context.Context, + cfg *config.Config, + client *redis.Client, + db database.Database, + ibd *ibd.Client, +) (*Manager, error) { + scraperSchedule, err := cron.ParseStandard(cfg.IBD.Schedules.IBD50) + if err != nil { + return nil, fmt.Errorf("unable to parse IBD50 schedule: %w", err) + } + scraper, err := ibd2.New(ctx, ibd, db, client, scraperSchedule) + if err != nil { + return nil, err + } + + authSchedule, err := cron.ParseStandard(cfg.IBD.Schedules.Auth) + if err != nil { + return nil, fmt.Errorf("unable to parse Auth schedule: %w", err) + } + authManager, err := auth.New(ctx, db, client, authSchedule) + if err != nil { + return nil, err + } + + return &Manager{ + db: db, + Monitor: NewWorkerMonitor(client), + Scraper: scraper, + Auth: authManager, + }, nil +} + +func (m *Manager) Run(ctx context.Context) error { + if err := m.db.Migrate(ctx); err != nil { + slog.ErrorContext(ctx, "Unable to migrate database", "error", err) + return err + } + + var wg sync.WaitGroup + wg.Add(4) + + go func() { + defer wg.Done() + m.db.Maintenance(ctx) + }() + + go func() { + defer wg.Done() + m.Monitor.Start(ctx) + }() + + go func() { + defer wg.Done() + m.Scraper.Run(ctx) + }() + + go func() { + defer wg.Done() + m.Auth.Run(ctx) + }() + + wg.Wait() + return ctx.Err() +} diff --git a/backend/internal/leader/manager/monitor.go b/backend/internal/leader/manager/monitor.go new file mode 100644 index 0000000..3b2e3ec --- /dev/null +++ b/backend/internal/leader/manager/monitor.go @@ -0,0 +1,164 @@ +package manager + +import ( + "context" + "fmt" + "log/slog" + "time" + + "github.com/buraksezer/consistent" + "github.com/cespare/xxhash/v2" + "github.com/redis/go-redis/v9" +) + +const ( + MonitorInterval = 5 * time.Second + ActiveWorkersSet = "active-workers" +) + +// WorkerMonitor is a struct that monitors workers and their heartbeats over redis. +type WorkerMonitor struct { + client *redis.Client + + // ring is a consistent hash ring that distributes partitions over detected workers. + ring *consistent.Consistent + // layoutChangeCh is a channel that others can listen to for layout changes to the ring. + layoutChangeCh chan struct{} +} + +// NewWorkerMonitor creates a new WorkerMonitor. +func NewWorkerMonitor(client *redis.Client) *WorkerMonitor { + var members []consistent.Member + return &WorkerMonitor{ + client: client, + ring: consistent.New(members, consistent.Config{ + Hasher: new(hasher), + PartitionCount: consistent.DefaultPartitionCount, + ReplicationFactor: consistent.DefaultReplicationFactor, + Load: consistent.DefaultLoad, + }), + layoutChangeCh: make(chan struct{}), + } +} + +func (m *WorkerMonitor) Close() error { + close(m.layoutChangeCh) + return nil +} + +func (m *WorkerMonitor) Changes() <-chan struct{} { + return m.layoutChangeCh +} + +func (m *WorkerMonitor) Start(ctx context.Context) { + m.monitorWorkers(ctx) + ticker := time.NewTicker(MonitorInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + m.monitorWorkers(ctx) + } + } +} + +func (m *WorkerMonitor) monitorWorkers(ctx context.Context) { + ctx, cancel := context.WithTimeout(ctx, MonitorInterval) + defer cancel() + + // Get all active workers. + workers, err := m.client.SMembers(ctx, ActiveWorkersSet).Result() + if err != nil { + slog.ErrorContext(ctx, "Unable to get active workers", "error", err) + return + } + + // Get existing workers in the ring. + existingWorkers := m.ring.GetMembers() + ewMap := make(map[string]bool) + for _, worker := range existingWorkers { + ewMap[worker.String()] = false + } + + // Check workers' heartbeats. + for _, worker := range workers { + exists, err := m.client.Exists(ctx, WorkerHeartbeatKey(worker)).Result() + if err != nil { + slog.ErrorContext(ctx, "Unable to check worker heartbeat", "worker", worker, "error", err) + continue + } + + if exists == 0 { + slog.WarnContext(ctx, "Worker heartbeat not found", "worker", worker) + + // Remove worker from active workers set. + if err = m.client.SRem(ctx, ActiveWorkersSet, worker).Err(); err != nil { + slog.ErrorContext(ctx, "Unable to remove worker from active workers set", "worker", worker, "error", err) + } + + // Remove worker from the ring. + m.removeWorker(worker) + } else { + // Add worker to the ring if it doesn't exist. + if _, ok := ewMap[worker]; !ok { + slog.InfoContext(ctx, "New worker detected", "worker", worker) + m.addWorker(worker) + } else { + ewMap[worker] = true + } + } + } + + // Check for workers that are not active anymore. + for worker, exists := range ewMap { + if !exists { + slog.WarnContext(ctx, "Worker is not active anymore", "worker", worker) + m.removeWorker(worker) + } + } +} + +func (m *WorkerMonitor) addWorker(worker string) { + m.ring.Add(member{hostname: worker}) + + // Notify others about the layout change. + select { + case m.layoutChangeCh <- struct{}{}: + // Notify others. + default: + // No one is listening. + } +} + +func (m *WorkerMonitor) removeWorker(worker string) { + m.ring.Remove(worker) + + // Notify others about the layout change. + select { + case m.layoutChangeCh <- struct{}{}: + // Notify others. + default: + // No one is listening. + } +} + +func WorkerHeartbeatKey(hostname string) string { + return fmt.Sprintf("worker:%s:heartbeat", hostname) +} + +type hasher struct{} + +func (h *hasher) Sum64(data []byte) uint64 { + return xxhash.Sum64(data) +} + +type member struct { + hostname string +} + +func (m member) String() string { + return m.hostname +} |