package scrape import ( "context" "errors" "log/slog" "time" "github.com/ansg191/ibd-trader/backend/internal/database" "github.com/ansg191/ibd-trader/backend/internal/ibd" "github.com/ansg191/ibd-trader/backend/internal/redis/taskqueue" "github.com/redis/go-redis/v9" "github.com/robfig/cron/v3" ) const ( Queue = "scrape-queue" QueueEncoding = taskqueue.EncodingJSON Channel = "scrape-channel" ) // Manager is responsible for sending scraping tasks to the workers. type Manager struct { client *ibd.Client db database.Executor queue taskqueue.TaskQueue[TaskInfo] schedule cron.Schedule pubsub *redis.PubSub } func New( ctx context.Context, client *ibd.Client, db database.Executor, redis *redis.Client, schedule cron.Schedule, ) (*Manager, error) { queue, err := taskqueue.New( ctx, redis, Queue, "ibd-manager", taskqueue.WithEncoding[TaskInfo](QueueEncoding), ) if err != nil { return nil, err } return &Manager{ client: client, db: db, queue: queue, schedule: schedule, pubsub: redis.Subscribe(ctx, Channel), }, nil } func (m *Manager) Close() error { return m.pubsub.Close() } func (m *Manager) Run(ctx context.Context) { ch := m.pubsub.Channel() for { now := time.Now() // Find the next time nextTime := m.schedule.Next(now) if nextTime.IsZero() { // Sleep until the next day time.Sleep(time.Until(now.AddDate(0, 0, 1))) continue } timer := time.NewTimer(nextTime.Sub(now)) slog.DebugContext(ctx, "waiting for next IBD50 scrape", "next_exec", nextTime) select { case <-timer.C: nextExec := m.schedule.Next(nextTime) m.scrapeIBD50(ctx, nextExec) case <-ch: nextExec := m.schedule.Next(time.Now()) m.scrapeIBD50(ctx, nextExec) case <-ctx.Done(): if !timer.Stop() { <-timer.C } return } } } func (m *Manager) scrapeIBD50(ctx context.Context, deadline time.Time) { ctx, cancel := context.WithDeadline(ctx, deadline) defer cancel() stocks, err := m.client.GetIBD50(ctx) if err != nil { if errors.Is(err, ibd.ErrNoAvailableCookies) { slog.WarnContext(ctx, "no available cookies", "error", err) return } slog.ErrorContext(ctx, "failed to get IBD50", "error", err) return } for _, stock := range stocks { // Add stock to DB err = database.AddStock(ctx, m.db, database.Stock{ Symbol: stock.Symbol, Name: stock.Name, IBDUrl: stock.QuoteURL.String(), }) if err != nil { slog.ErrorContext(ctx, "failed to add stock", "error", err) continue } // Add ranking to Db err = database.AddRanking(ctx, m.db, stock.Symbol, int(stock.Rank), 0) if err != nil { slog.ErrorContext(ctx, "failed to add ranking", "error", err) continue } // Add scrape task to queue task := TaskInfo{Symbol: stock.Symbol} taskID, err := m.queue.Enqueue(ctx, task) if err != nil { slog.ErrorContext(ctx, "failed to enqueue task", "error", err) } slog.DebugContext(ctx, "enqueued scrape task", "task_id", taskID, "symbol", stock.Symbol) } } type TaskInfo struct { Symbol string `json:"symbol"` }