aboutsummaryrefslogtreecommitdiff
path: root/backend/internal/worker/scraper
diff options
context:
space:
mode:
Diffstat (limited to 'backend/internal/worker/scraper')
-rw-r--r--backend/internal/worker/scraper/scraper.go55
1 files changed, 31 insertions, 24 deletions
diff --git a/backend/internal/worker/scraper/scraper.go b/backend/internal/worker/scraper/scraper.go
index 4788834..c5c1b6c 100644
--- a/backend/internal/worker/scraper/scraper.go
+++ b/backend/internal/worker/scraper/scraper.go
@@ -77,10 +77,17 @@ func waitForTask(
return
}
- ch := make(chan error)
+ errCh := make(chan error)
+ resCh := make(chan string)
+ defer close(errCh)
+ defer close(resCh)
go func() {
- defer close(ch)
- ch <- scrapeUrl(ctx, client, db, aQueue, task.Data.Symbol)
+ res, err := scrapeUrl(ctx, client, db, aQueue, task.Data.Symbol)
+ if err != nil {
+ errCh <- err
+ return
+ }
+ resCh <- res
}()
ticker := time.NewTicker(lockTimeout / 5)
@@ -102,22 +109,22 @@ func waitForTask(
slog.ErrorContext(ctx, "Failed to extend lock", "error", err)
}
}()
- case err = <-ch:
- // scrapeUrl has completed.
+ case err = <-errCh:
+ // scrapeUrl has errored.
+ slog.ErrorContext(ctx, "Failed to scrape URL", "error", err)
+ _, err = queue.Return(ctx, task.ID, err)
if err != nil {
- slog.ErrorContext(ctx, "Failed to scrape URL", "error", err)
- _, err = queue.Return(ctx, task.ID, err)
- if err != nil {
- slog.ErrorContext(ctx, "Failed to return task", "error", err)
- return
- }
- } else {
- slog.DebugContext(ctx, "Scraped URL", "symbol", task.Data.Symbol)
- err = queue.Complete(ctx, task.ID, nil)
- if err != nil {
- slog.ErrorContext(ctx, "Failed to complete task", "error", err)
- return
- }
+ slog.ErrorContext(ctx, "Failed to return task", "error", err)
+ return
+ }
+ return
+ case res := <-resCh:
+ // scrapeUrl has completed successfully.
+ slog.DebugContext(ctx, "Scraped URL", "symbol", task.Data.Symbol)
+ err = queue.Complete(ctx, task.ID, res)
+ if err != nil {
+ slog.ErrorContext(ctx, "Failed to complete task", "error", err)
+ return
}
return
}
@@ -130,36 +137,36 @@ func scrapeUrl(
db database.TransactionExecutor,
aQueue taskqueue.TaskQueue[analyzer.TaskInfo],
symbol string,
-) error {
+) (string, error) {
ctx, cancel := context.WithTimeout(ctx, lockTimeout)
defer cancel()
stockUrl, err := getStockUrl(ctx, db, client, symbol)
if err != nil {
- return fmt.Errorf("failed to get stock url: %w", err)
+ return "", fmt.Errorf("failed to get stock url: %w", err)
}
// Scrape the stock info.
info, err := client.StockInfo(ctx, stockUrl)
if err != nil {
- return fmt.Errorf("failed to get stock info: %w", err)
+ return "", fmt.Errorf("failed to get stock info: %w", err)
}
// Add stock info to the database.
id, err := database.AddStockInfo(ctx, db, info)
if err != nil {
- return fmt.Errorf("failed to add stock info: %w", err)
+ return "", fmt.Errorf("failed to add stock info: %w", err)
}
// Add the stock to the analyzer queue.
_, err = aQueue.Enqueue(ctx, analyzer.TaskInfo{ID: id})
if err != nil {
- return fmt.Errorf("failed to enqueue analysis task: %w", err)
+ return "", fmt.Errorf("failed to enqueue analysis task: %w", err)
}
slog.DebugContext(ctx, "Added stock info", "id", id)
- return nil
+ return id, nil
}
func getStockUrl(ctx context.Context, db database.TransactionExecutor, client *ibd.Client, symbol string) (string, error) {