diff options
Diffstat (limited to 'backend/internal/worker/scraper/scraper.go')
-rw-r--r-- | backend/internal/worker/scraper/scraper.go | 198 |
1 files changed, 198 insertions, 0 deletions
diff --git a/backend/internal/worker/scraper/scraper.go b/backend/internal/worker/scraper/scraper.go new file mode 100644 index 0000000..c5c1b6c --- /dev/null +++ b/backend/internal/worker/scraper/scraper.go @@ -0,0 +1,198 @@ +package scraper + +import ( + "context" + "errors" + "fmt" + "log/slog" + "time" + + "github.com/ansg191/ibd-trader-backend/internal/database" + "github.com/ansg191/ibd-trader-backend/internal/ibd" + "github.com/ansg191/ibd-trader-backend/internal/leader/manager/ibd/scrape" + "github.com/ansg191/ibd-trader-backend/internal/redis/taskqueue" + "github.com/ansg191/ibd-trader-backend/internal/worker/analyzer" + + "github.com/redis/go-redis/v9" +) + +const ( + lockTimeout = 1 * time.Minute + dequeueTimeout = 5 * time.Second +) + +func RunScraper( + ctx context.Context, + redis *redis.Client, + client *ibd.Client, + db database.TransactionExecutor, + name string, +) error { + queue, err := taskqueue.New( + ctx, + redis, + scrape.Queue, + name, + taskqueue.WithEncoding[scrape.TaskInfo](scrape.QueueEncoding), + ) + if err != nil { + return err + } + + aQueue, err := taskqueue.New( + ctx, + redis, + analyzer.Queue, + name, + taskqueue.WithEncoding[analyzer.TaskInfo](analyzer.QueueEncoding), + ) + if err != nil { + return err + } + + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + waitForTask(ctx, queue, aQueue, client, db) + } + } +} + +func waitForTask( + ctx context.Context, + queue taskqueue.TaskQueue[scrape.TaskInfo], + aQueue taskqueue.TaskQueue[analyzer.TaskInfo], + client *ibd.Client, + db database.TransactionExecutor, +) { + task, err := queue.Dequeue(ctx, lockTimeout, dequeueTimeout) + if err != nil { + slog.ErrorContext(ctx, "Failed to dequeue task", "error", err) + return + } + if task == nil { + // No task available. + return + } + + errCh := make(chan error) + resCh := make(chan string) + defer close(errCh) + defer close(resCh) + go func() { + res, err := scrapeUrl(ctx, client, db, aQueue, task.Data.Symbol) + if err != nil { + errCh <- err + return + } + resCh <- res + }() + + ticker := time.NewTicker(lockTimeout / 5) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + // Context was canceled. Return early. + return + case <-ticker.C: + // Extend the lock periodically. + func() { + ctx, cancel := context.WithTimeout(ctx, lockTimeout/5) + defer cancel() + + err := queue.Extend(ctx, task.ID) + if err != nil { + slog.ErrorContext(ctx, "Failed to extend lock", "error", err) + } + }() + case err = <-errCh: + // scrapeUrl has errored. + slog.ErrorContext(ctx, "Failed to scrape URL", "error", err) + _, err = queue.Return(ctx, task.ID, err) + if err != nil { + slog.ErrorContext(ctx, "Failed to return task", "error", err) + return + } + return + case res := <-resCh: + // scrapeUrl has completed successfully. + slog.DebugContext(ctx, "Scraped URL", "symbol", task.Data.Symbol) + err = queue.Complete(ctx, task.ID, res) + if err != nil { + slog.ErrorContext(ctx, "Failed to complete task", "error", err) + return + } + return + } + } +} + +func scrapeUrl( + ctx context.Context, + client *ibd.Client, + db database.TransactionExecutor, + aQueue taskqueue.TaskQueue[analyzer.TaskInfo], + symbol string, +) (string, error) { + ctx, cancel := context.WithTimeout(ctx, lockTimeout) + defer cancel() + + stockUrl, err := getStockUrl(ctx, db, client, symbol) + if err != nil { + return "", fmt.Errorf("failed to get stock url: %w", err) + } + + // Scrape the stock info. + info, err := client.StockInfo(ctx, stockUrl) + if err != nil { + return "", fmt.Errorf("failed to get stock info: %w", err) + } + + // Add stock info to the database. + id, err := database.AddStockInfo(ctx, db, info) + if err != nil { + return "", fmt.Errorf("failed to add stock info: %w", err) + } + + // Add the stock to the analyzer queue. + _, err = aQueue.Enqueue(ctx, analyzer.TaskInfo{ID: id}) + if err != nil { + return "", fmt.Errorf("failed to enqueue analysis task: %w", err) + } + + slog.DebugContext(ctx, "Added stock info", "id", id) + + return id, nil +} + +func getStockUrl(ctx context.Context, db database.TransactionExecutor, client *ibd.Client, symbol string) (string, error) { + // Get the stock from the database. + stock, err := database.GetStock(ctx, db, symbol) + if err == nil { + return stock.IBDUrl, nil + } + if !errors.Is(err, database.ErrStockNotFound) { + return "", fmt.Errorf("failed to get stock: %w", err) + } + + // If stock isn't found in the database, get the stock from IBD. + stock, err = client.Search(ctx, symbol) + if errors.Is(err, ibd.ErrSymbolNotFound) { + return "", fmt.Errorf("symbol not found: %w", err) + } + if err != nil { + return "", fmt.Errorf("failed to search for symbol: %w", err) + } + + // Add the stock to the database. + err = database.AddStock(ctx, db, stock) + if err != nil { + return "", fmt.Errorf("failed to add stock: %w", err) + } + + return stock.IBDUrl, nil +} |