package scraper import ( "context" "errors" "fmt" "log/slog" "time" "github.com/ansg191/ibd-trader-backend/internal/database" "github.com/ansg191/ibd-trader-backend/internal/ibd" "github.com/ansg191/ibd-trader-backend/internal/leader/manager/ibd/scrape" "github.com/ansg191/ibd-trader-backend/internal/redis/taskqueue" "github.com/ansg191/ibd-trader-backend/internal/worker/analyzer" "github.com/redis/go-redis/v9" ) const ( lockTimeout = 1 * time.Minute dequeueTimeout = 5 * time.Second ) func RunScraper( ctx context.Context, redis *redis.Client, client *ibd.Client, db database.TransactionExecutor, name string, ) error { queue, err := taskqueue.New( ctx, redis, scrape.Queue, name, taskqueue.WithEncoding[scrape.TaskInfo](scrape.QueueEncoding), ) if err != nil { return err } aQueue, err := taskqueue.New( ctx, redis, analyzer.Queue, name, taskqueue.WithEncoding[analyzer.TaskInfo](analyzer.QueueEncoding), ) if err != nil { return err } for { select { case <-ctx.Done(): return ctx.Err() default: waitForTask(ctx, queue, aQueue, client, db) } } } func waitForTask( ctx context.Context, queue taskqueue.TaskQueue[scrape.TaskInfo], aQueue taskqueue.TaskQueue[analyzer.TaskInfo], client *ibd.Client, db database.TransactionExecutor, ) { task, err := queue.Dequeue(ctx, lockTimeout, dequeueTimeout) if err != nil { slog.ErrorContext(ctx, "Failed to dequeue task", "error", err) return } if task == nil { // No task available. return } errCh := make(chan error) resCh := make(chan string) defer close(errCh) defer close(resCh) go func() { res, err := scrapeUrl(ctx, client, db, aQueue, task.Data.Symbol) if err != nil { errCh <- err return } resCh <- res }() ticker := time.NewTicker(lockTimeout / 5) defer ticker.Stop() for { select { case <-ctx.Done(): // Context was canceled. Return early. return case <-ticker.C: // Extend the lock periodically. func() { ctx, cancel := context.WithTimeout(ctx, lockTimeout/5) defer cancel() err := queue.Extend(ctx, task.ID) if err != nil { slog.ErrorContext(ctx, "Failed to extend lock", "error", err) } }() case err = <-errCh: // scrapeUrl has errored. slog.ErrorContext(ctx, "Failed to scrape URL", "error", err) _, err = queue.Return(ctx, task.ID, err) if err != nil { slog.ErrorContext(ctx, "Failed to return task", "error", err) return } return case res := <-resCh: // scrapeUrl has completed successfully. slog.DebugContext(ctx, "Scraped URL", "symbol", task.Data.Symbol) err = queue.Complete(ctx, task.ID, res) if err != nil { slog.ErrorContext(ctx, "Failed to complete task", "error", err) return } return } } } func scrapeUrl( ctx context.Context, client *ibd.Client, db database.TransactionExecutor, aQueue taskqueue.TaskQueue[analyzer.TaskInfo], symbol string, ) (string, error) { ctx, cancel := context.WithTimeout(ctx, lockTimeout) defer cancel() stockUrl, err := getStockUrl(ctx, db, client, symbol) if err != nil { return "", fmt.Errorf("failed to get stock url: %w", err) } // Scrape the stock info. info, err := client.StockInfo(ctx, stockUrl) if err != nil { return "", fmt.Errorf("failed to get stock info: %w", err) } // Add stock info to the database. id, err := database.AddStockInfo(ctx, db, info) if err != nil { return "", fmt.Errorf("failed to add stock info: %w", err) } // Add the stock to the analyzer queue. _, err = aQueue.Enqueue(ctx, analyzer.TaskInfo{ID: id}) if err != nil { return "", fmt.Errorf("failed to enqueue analysis task: %w", err) } slog.DebugContext(ctx, "Added stock info", "id", id) return id, nil } func getStockUrl(ctx context.Context, db database.TransactionExecutor, client *ibd.Client, symbol string) (string, error) { // Get the stock from the database. stock, err := database.GetStock(ctx, db, symbol) if err == nil { return stock.IBDUrl, nil } if !errors.Is(err, database.ErrStockNotFound) { return "", fmt.Errorf("failed to get stock: %w", err) } // If stock isn't found in the database, get the stock from IBD. stock, err = client.Search(ctx, symbol) if errors.Is(err, ibd.ErrSymbolNotFound) { return "", fmt.Errorf("symbol not found: %w", err) } if err != nil { return "", fmt.Errorf("failed to search for symbol: %w", err) } // Add the stock to the database. err = database.AddStock(ctx, db, stock) if err != nil { return "", fmt.Errorf("failed to add stock: %w", err) } return stock.IBDUrl, nil }