aboutsummaryrefslogtreecommitdiff
path: root/backend/internal/leader
diff options
context:
space:
mode:
authorGravatar Anshul Gupta <ansg191@anshulg.com> 2024-08-11 13:15:50 -0700
committerGravatar Anshul Gupta <ansg191@anshulg.com> 2024-08-11 13:15:50 -0700
commit6a3c21fb0b1c126849f2bbff494403bbe901448e (patch)
tree5d7805524357c2c8a9819c39d2051a4e3633a1d5 /backend/internal/leader
parent29c6040a51616e9e4cf6c70ee16391b2a3b238c9 (diff)
parentf34b92ded11b07f78575ac62c260a380c468e5ea (diff)
downloadibd-trader-6a3c21fb0b1c126849f2bbff494403bbe901448e.tar.gz
ibd-trader-6a3c21fb0b1c126849f2bbff494403bbe901448e.tar.zst
ibd-trader-6a3c21fb0b1c126849f2bbff494403bbe901448e.zip
Merge remote-tracking branch 'backend/main'
Diffstat (limited to 'backend/internal/leader')
-rw-r--r--backend/internal/leader/election/election.go128
-rw-r--r--backend/internal/leader/manager/ibd/auth/auth.go111
-rw-r--r--backend/internal/leader/manager/ibd/ibd.go8
-rw-r--r--backend/internal/leader/manager/ibd/scrape/scrape.go140
-rw-r--r--backend/internal/leader/manager/manager.go90
-rw-r--r--backend/internal/leader/manager/monitor.go164
6 files changed, 641 insertions, 0 deletions
diff --git a/backend/internal/leader/election/election.go b/backend/internal/leader/election/election.go
new file mode 100644
index 0000000..6f83298
--- /dev/null
+++ b/backend/internal/leader/election/election.go
@@ -0,0 +1,128 @@
+package election
+
+import (
+ "context"
+ "errors"
+ "log/slog"
+ "time"
+
+ "github.com/bsm/redislock"
+)
+
+var defaultLeaderElectionOptions = leaderElectionOptions{
+ lockKey: "ibd-leader-election",
+ lockTTL: 10 * time.Second,
+}
+
+func RunOrDie(
+ ctx context.Context,
+ client redislock.RedisClient,
+ onLeader func(context.Context),
+ opts ...LeaderElectionOption,
+) {
+ o := defaultLeaderElectionOptions
+ for _, opt := range opts {
+ opt(&o)
+ }
+
+ locker := redislock.New(client)
+
+ // Election loop
+ for {
+ lock, err := locker.Obtain(ctx, o.lockKey, o.lockTTL, nil)
+ if errors.Is(err, redislock.ErrNotObtained) {
+ // Another instance is the leader
+ } else if err != nil {
+ slog.ErrorContext(ctx, "failed to obtain lock", "error", err)
+ } else {
+ // We are the leader
+ slog.DebugContext(ctx, "elected leader")
+ runLeader(ctx, lock, onLeader, o)
+ }
+
+ // Sleep for a bit before trying again
+ timer := time.NewTimer(o.lockTTL / 5)
+ select {
+ case <-ctx.Done():
+ if !timer.Stop() {
+ <-timer.C
+ }
+ return
+ case <-timer.C:
+ }
+ }
+}
+
+func runLeader(
+ ctx context.Context,
+ lock *redislock.Lock,
+ onLeader func(context.Context),
+ o leaderElectionOptions,
+) {
+ // A context that is canceled when the leader loses the lock
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ // Release the lock when done
+ defer func() {
+ // Create new context without cancel if the original context is already canceled
+ relCtx := ctx
+ if ctx.Err() != nil {
+ relCtx = context.WithoutCancel(ctx)
+ }
+
+ // Add a timeout to the release context
+ relCtx, cancel := context.WithTimeout(relCtx, o.lockTTL)
+ defer cancel()
+
+ if err := lock.Release(relCtx); err != nil {
+ slog.Error("failed to release lock", "error", err)
+ }
+ }()
+
+ // Run the leader code
+ go func(ctx context.Context) {
+ onLeader(ctx)
+
+ // If the leader code returns, cancel the context to release the lock
+ cancel()
+ }(ctx)
+
+ // Refresh the lock periodically
+ ticker := time.NewTicker(o.lockTTL / 10)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ticker.C:
+ err := lock.Refresh(ctx, o.lockTTL, nil)
+ if errors.Is(err, redislock.ErrNotObtained) || errors.Is(err, redislock.ErrLockNotHeld) {
+ slog.ErrorContext(ctx, "leadership lost", "error", err)
+ return
+ } else if err != nil {
+ slog.ErrorContext(ctx, "failed to refresh lock", "error", err)
+ }
+ case <-ctx.Done():
+ return
+ }
+ }
+}
+
+type leaderElectionOptions struct {
+ lockKey string
+ lockTTL time.Duration
+}
+
+type LeaderElectionOption func(*leaderElectionOptions)
+
+func WithLockKey(key string) LeaderElectionOption {
+ return func(o *leaderElectionOptions) {
+ o.lockKey = key
+ }
+}
+
+func WithLockTTL(ttl time.Duration) LeaderElectionOption {
+ return func(o *leaderElectionOptions) {
+ o.lockTTL = ttl
+ }
+}
diff --git a/backend/internal/leader/manager/ibd/auth/auth.go b/backend/internal/leader/manager/ibd/auth/auth.go
new file mode 100644
index 0000000..9b5502d
--- /dev/null
+++ b/backend/internal/leader/manager/ibd/auth/auth.go
@@ -0,0 +1,111 @@
+package auth
+
+import (
+ "context"
+ "log/slog"
+ "time"
+
+ "github.com/ansg191/ibd-trader-backend/internal/database"
+ "github.com/ansg191/ibd-trader-backend/internal/redis/taskqueue"
+
+ "github.com/redis/go-redis/v9"
+ "github.com/robfig/cron/v3"
+)
+
+const (
+ Queue = "auth-queue"
+ QueueEncoding = taskqueue.EncodingJSON
+)
+
+// Manager is responsible for sending authentication tasks to the workers.
+type Manager struct {
+ queue taskqueue.TaskQueue[TaskInfo]
+ db database.Executor
+ schedule cron.Schedule
+}
+
+func New(
+ ctx context.Context,
+ db database.Executor,
+ rClient *redis.Client,
+ schedule cron.Schedule,
+) (*Manager, error) {
+ queue, err := taskqueue.New(
+ ctx,
+ rClient,
+ Queue,
+ "auth-manager",
+ taskqueue.WithEncoding[TaskInfo](QueueEncoding),
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ return &Manager{
+ queue: queue,
+ db: db,
+ schedule: schedule,
+ }, nil
+}
+
+func (m *Manager) Run(ctx context.Context) {
+ for {
+ now := time.Now()
+ // Find the next time
+ nextTime := m.schedule.Next(now)
+ if nextTime.IsZero() {
+ // Sleep until the next day
+ time.Sleep(time.Until(now.AddDate(0, 0, 1)))
+ continue
+ }
+
+ timer := time.NewTimer(nextTime.Sub(now))
+ slog.DebugContext(ctx, "waiting for next Auth scrape", "next_exec", nextTime)
+
+ select {
+ case <-timer.C:
+ nextExec := m.schedule.Next(nextTime)
+ m.scrapeCookies(ctx, nextExec)
+ case <-ctx.Done():
+ if !timer.Stop() {
+ <-timer.C
+ }
+ return
+ }
+ }
+}
+
+// scrapeCookies scrapes the cookies for every user from the IBD website.
+//
+// This iterates through all users with IBD credentials and checks whether their cookies are still valid.
+// If the cookies are invalid or missing, it re-authenticates the user and updates the cookies in the database.
+func (m *Manager) scrapeCookies(ctx context.Context, deadline time.Time) {
+ ctx, cancel := context.WithDeadline(ctx, deadline)
+ defer cancel()
+
+ // Get all users with IBD credentials
+ users, err := database.ListUsers(ctx, m.db, true)
+ if err != nil {
+ slog.ErrorContext(ctx, "failed to get users", "error", err)
+ return
+ }
+
+ // Create a new task for each user
+ for _, user := range users {
+ task := TaskInfo{
+ UserSubject: user.Subject,
+ }
+
+ // Enqueue the task
+ _, err := m.queue.Enqueue(ctx, task)
+ if err != nil {
+ slog.ErrorContext(ctx, "failed to enqueue task", "error", err)
+ }
+ }
+
+ slog.InfoContext(ctx, "enqueued tasks for all users")
+}
+
+type TaskInfo struct {
+ UserSubject string `json:"user_subject"`
+}
diff --git a/backend/internal/leader/manager/ibd/ibd.go b/backend/internal/leader/manager/ibd/ibd.go
new file mode 100644
index 0000000..e2d4fc0
--- /dev/null
+++ b/backend/internal/leader/manager/ibd/ibd.go
@@ -0,0 +1,8 @@
+package ibd
+
+type Schedules struct {
+ // Auth schedule
+ Auth string
+ // IBD50 schedule
+ IBD50 string
+}
diff --git a/backend/internal/leader/manager/ibd/scrape/scrape.go b/backend/internal/leader/manager/ibd/scrape/scrape.go
new file mode 100644
index 0000000..870ce5e
--- /dev/null
+++ b/backend/internal/leader/manager/ibd/scrape/scrape.go
@@ -0,0 +1,140 @@
+package scrape
+
+import (
+ "context"
+ "errors"
+ "log/slog"
+ "time"
+
+ "github.com/ansg191/ibd-trader-backend/internal/database"
+ "github.com/ansg191/ibd-trader-backend/internal/ibd"
+ "github.com/ansg191/ibd-trader-backend/internal/redis/taskqueue"
+
+ "github.com/redis/go-redis/v9"
+ "github.com/robfig/cron/v3"
+)
+
+const (
+ Queue = "scrape-queue"
+ QueueEncoding = taskqueue.EncodingJSON
+ Channel = "scrape-channel"
+)
+
+// Manager is responsible for sending scraping tasks to the workers.
+type Manager struct {
+ client *ibd.Client
+ db database.Executor
+ queue taskqueue.TaskQueue[TaskInfo]
+ schedule cron.Schedule
+ pubsub *redis.PubSub
+}
+
+func New(
+ ctx context.Context,
+ client *ibd.Client,
+ db database.Executor,
+ redis *redis.Client,
+ schedule cron.Schedule,
+) (*Manager, error) {
+ queue, err := taskqueue.New(
+ ctx,
+ redis,
+ Queue,
+ "ibd-manager",
+ taskqueue.WithEncoding[TaskInfo](QueueEncoding),
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ return &Manager{
+ client: client,
+ db: db,
+ queue: queue,
+ schedule: schedule,
+ pubsub: redis.Subscribe(ctx, Channel),
+ }, nil
+}
+
+func (m *Manager) Close() error {
+ return m.pubsub.Close()
+}
+
+func (m *Manager) Run(ctx context.Context) {
+ ch := m.pubsub.Channel()
+ for {
+ now := time.Now()
+ // Find the next time
+ nextTime := m.schedule.Next(now)
+ if nextTime.IsZero() {
+ // Sleep until the next day
+ time.Sleep(time.Until(now.AddDate(0, 0, 1)))
+ continue
+ }
+
+ timer := time.NewTimer(nextTime.Sub(now))
+ slog.DebugContext(ctx, "waiting for next IBD50 scrape", "next_exec", nextTime)
+
+ select {
+ case <-timer.C:
+ nextExec := m.schedule.Next(nextTime)
+ m.scrapeIBD50(ctx, nextExec)
+ case <-ch:
+ nextExec := m.schedule.Next(time.Now())
+ m.scrapeIBD50(ctx, nextExec)
+ case <-ctx.Done():
+ if !timer.Stop() {
+ <-timer.C
+ }
+ return
+ }
+ }
+}
+
+func (m *Manager) scrapeIBD50(ctx context.Context, deadline time.Time) {
+ ctx, cancel := context.WithDeadline(ctx, deadline)
+ defer cancel()
+
+ stocks, err := m.client.GetIBD50(ctx)
+ if err != nil {
+ if errors.Is(err, ibd.ErrNoAvailableCookies) {
+ slog.WarnContext(ctx, "no available cookies", "error", err)
+ return
+ }
+ slog.ErrorContext(ctx, "failed to get IBD50", "error", err)
+ return
+ }
+
+ for _, stock := range stocks {
+ // Add stock to DB
+ err = database.AddStock(ctx, m.db, database.Stock{
+ Symbol: stock.Symbol,
+ Name: stock.Name,
+ IBDUrl: stock.QuoteURL.String(),
+ })
+ if err != nil {
+ slog.ErrorContext(ctx, "failed to add stock", "error", err)
+ continue
+ }
+
+ // Add ranking to Db
+ err = database.AddRanking(ctx, m.db, stock.Symbol, int(stock.Rank), 0)
+ if err != nil {
+ slog.ErrorContext(ctx, "failed to add ranking", "error", err)
+ continue
+ }
+
+ // Add scrape task to queue
+ task := TaskInfo{Symbol: stock.Symbol}
+ taskID, err := m.queue.Enqueue(ctx, task)
+ if err != nil {
+ slog.ErrorContext(ctx, "failed to enqueue task", "error", err)
+ }
+
+ slog.DebugContext(ctx, "enqueued scrape task", "task_id", taskID, "symbol", stock.Symbol)
+ }
+}
+
+type TaskInfo struct {
+ Symbol string `json:"symbol"`
+}
diff --git a/backend/internal/leader/manager/manager.go b/backend/internal/leader/manager/manager.go
new file mode 100644
index 0000000..61e27e0
--- /dev/null
+++ b/backend/internal/leader/manager/manager.go
@@ -0,0 +1,90 @@
+package manager
+
+import (
+ "context"
+ "fmt"
+ "log/slog"
+ "sync"
+
+ "github.com/ansg191/ibd-trader-backend/internal/config"
+ "github.com/ansg191/ibd-trader-backend/internal/database"
+ "github.com/ansg191/ibd-trader-backend/internal/ibd"
+ "github.com/ansg191/ibd-trader-backend/internal/leader/manager/ibd/auth"
+ ibd2 "github.com/ansg191/ibd-trader-backend/internal/leader/manager/ibd/scrape"
+
+ "github.com/redis/go-redis/v9"
+ "github.com/robfig/cron/v3"
+)
+
+type Manager struct {
+ db database.Database
+ Monitor *WorkerMonitor
+ Scraper *ibd2.Manager
+ Auth *auth.Manager
+}
+
+func New(
+ ctx context.Context,
+ cfg *config.Config,
+ client *redis.Client,
+ db database.Database,
+ ibd *ibd.Client,
+) (*Manager, error) {
+ scraperSchedule, err := cron.ParseStandard(cfg.IBD.Schedules.IBD50)
+ if err != nil {
+ return nil, fmt.Errorf("unable to parse IBD50 schedule: %w", err)
+ }
+ scraper, err := ibd2.New(ctx, ibd, db, client, scraperSchedule)
+ if err != nil {
+ return nil, err
+ }
+
+ authSchedule, err := cron.ParseStandard(cfg.IBD.Schedules.Auth)
+ if err != nil {
+ return nil, fmt.Errorf("unable to parse Auth schedule: %w", err)
+ }
+ authManager, err := auth.New(ctx, db, client, authSchedule)
+ if err != nil {
+ return nil, err
+ }
+
+ return &Manager{
+ db: db,
+ Monitor: NewWorkerMonitor(client),
+ Scraper: scraper,
+ Auth: authManager,
+ }, nil
+}
+
+func (m *Manager) Run(ctx context.Context) error {
+ if err := m.db.Migrate(ctx); err != nil {
+ slog.ErrorContext(ctx, "Unable to migrate database", "error", err)
+ return err
+ }
+
+ var wg sync.WaitGroup
+ wg.Add(4)
+
+ go func() {
+ defer wg.Done()
+ m.db.Maintenance(ctx)
+ }()
+
+ go func() {
+ defer wg.Done()
+ m.Monitor.Start(ctx)
+ }()
+
+ go func() {
+ defer wg.Done()
+ m.Scraper.Run(ctx)
+ }()
+
+ go func() {
+ defer wg.Done()
+ m.Auth.Run(ctx)
+ }()
+
+ wg.Wait()
+ return ctx.Err()
+}
diff --git a/backend/internal/leader/manager/monitor.go b/backend/internal/leader/manager/monitor.go
new file mode 100644
index 0000000..3b2e3ec
--- /dev/null
+++ b/backend/internal/leader/manager/monitor.go
@@ -0,0 +1,164 @@
+package manager
+
+import (
+ "context"
+ "fmt"
+ "log/slog"
+ "time"
+
+ "github.com/buraksezer/consistent"
+ "github.com/cespare/xxhash/v2"
+ "github.com/redis/go-redis/v9"
+)
+
+const (
+ MonitorInterval = 5 * time.Second
+ ActiveWorkersSet = "active-workers"
+)
+
+// WorkerMonitor is a struct that monitors workers and their heartbeats over redis.
+type WorkerMonitor struct {
+ client *redis.Client
+
+ // ring is a consistent hash ring that distributes partitions over detected workers.
+ ring *consistent.Consistent
+ // layoutChangeCh is a channel that others can listen to for layout changes to the ring.
+ layoutChangeCh chan struct{}
+}
+
+// NewWorkerMonitor creates a new WorkerMonitor.
+func NewWorkerMonitor(client *redis.Client) *WorkerMonitor {
+ var members []consistent.Member
+ return &WorkerMonitor{
+ client: client,
+ ring: consistent.New(members, consistent.Config{
+ Hasher: new(hasher),
+ PartitionCount: consistent.DefaultPartitionCount,
+ ReplicationFactor: consistent.DefaultReplicationFactor,
+ Load: consistent.DefaultLoad,
+ }),
+ layoutChangeCh: make(chan struct{}),
+ }
+}
+
+func (m *WorkerMonitor) Close() error {
+ close(m.layoutChangeCh)
+ return nil
+}
+
+func (m *WorkerMonitor) Changes() <-chan struct{} {
+ return m.layoutChangeCh
+}
+
+func (m *WorkerMonitor) Start(ctx context.Context) {
+ m.monitorWorkers(ctx)
+ ticker := time.NewTicker(MonitorInterval)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-ticker.C:
+ m.monitorWorkers(ctx)
+ }
+ }
+}
+
+func (m *WorkerMonitor) monitorWorkers(ctx context.Context) {
+ ctx, cancel := context.WithTimeout(ctx, MonitorInterval)
+ defer cancel()
+
+ // Get all active workers.
+ workers, err := m.client.SMembers(ctx, ActiveWorkersSet).Result()
+ if err != nil {
+ slog.ErrorContext(ctx, "Unable to get active workers", "error", err)
+ return
+ }
+
+ // Get existing workers in the ring.
+ existingWorkers := m.ring.GetMembers()
+ ewMap := make(map[string]bool)
+ for _, worker := range existingWorkers {
+ ewMap[worker.String()] = false
+ }
+
+ // Check workers' heartbeats.
+ for _, worker := range workers {
+ exists, err := m.client.Exists(ctx, WorkerHeartbeatKey(worker)).Result()
+ if err != nil {
+ slog.ErrorContext(ctx, "Unable to check worker heartbeat", "worker", worker, "error", err)
+ continue
+ }
+
+ if exists == 0 {
+ slog.WarnContext(ctx, "Worker heartbeat not found", "worker", worker)
+
+ // Remove worker from active workers set.
+ if err = m.client.SRem(ctx, ActiveWorkersSet, worker).Err(); err != nil {
+ slog.ErrorContext(ctx, "Unable to remove worker from active workers set", "worker", worker, "error", err)
+ }
+
+ // Remove worker from the ring.
+ m.removeWorker(worker)
+ } else {
+ // Add worker to the ring if it doesn't exist.
+ if _, ok := ewMap[worker]; !ok {
+ slog.InfoContext(ctx, "New worker detected", "worker", worker)
+ m.addWorker(worker)
+ } else {
+ ewMap[worker] = true
+ }
+ }
+ }
+
+ // Check for workers that are not active anymore.
+ for worker, exists := range ewMap {
+ if !exists {
+ slog.WarnContext(ctx, "Worker is not active anymore", "worker", worker)
+ m.removeWorker(worker)
+ }
+ }
+}
+
+func (m *WorkerMonitor) addWorker(worker string) {
+ m.ring.Add(member{hostname: worker})
+
+ // Notify others about the layout change.
+ select {
+ case m.layoutChangeCh <- struct{}{}:
+ // Notify others.
+ default:
+ // No one is listening.
+ }
+}
+
+func (m *WorkerMonitor) removeWorker(worker string) {
+ m.ring.Remove(worker)
+
+ // Notify others about the layout change.
+ select {
+ case m.layoutChangeCh <- struct{}{}:
+ // Notify others.
+ default:
+ // No one is listening.
+ }
+}
+
+func WorkerHeartbeatKey(hostname string) string {
+ return fmt.Sprintf("worker:%s:heartbeat", hostname)
+}
+
+type hasher struct{}
+
+func (h *hasher) Sum64(data []byte) uint64 {
+ return xxhash.Sum64(data)
+}
+
+type member struct {
+ hostname string
+}
+
+func (m member) String() string {
+ return m.hostname
+}