aboutsummaryrefslogtreecommitdiff
path: root/backend/internal/worker/scraper
diff options
context:
space:
mode:
authorGravatar Anshul Gupta <ansg191@anshulg.com> 2024-08-05 18:55:10 -0700
committerGravatar Anshul Gupta <ansg191@anshulg.com> 2024-08-05 18:55:19 -0700
commitb96fcd1a54a46a95f98467b49a051564bc21c23c (patch)
tree93caeeb05f8d6310e241095608ea2428c749b18c /backend/internal/worker/scraper
downloadibd-trader-b96fcd1a54a46a95f98467b49a051564bc21c23c.tar.gz
ibd-trader-b96fcd1a54a46a95f98467b49a051564bc21c23c.tar.zst
ibd-trader-b96fcd1a54a46a95f98467b49a051564bc21c23c.zip
Initial Commit
Diffstat (limited to 'backend/internal/worker/scraper')
-rw-r--r--backend/internal/worker/scraper/scraper.go191
1 files changed, 191 insertions, 0 deletions
diff --git a/backend/internal/worker/scraper/scraper.go b/backend/internal/worker/scraper/scraper.go
new file mode 100644
index 0000000..a83d9ae
--- /dev/null
+++ b/backend/internal/worker/scraper/scraper.go
@@ -0,0 +1,191 @@
+package scraper
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "log/slog"
+ "time"
+
+ "ibd-trader/internal/database"
+ "ibd-trader/internal/ibd"
+ "ibd-trader/internal/leader/manager/ibd/scrape"
+ "ibd-trader/internal/redis/taskqueue"
+ "ibd-trader/internal/worker/analyzer"
+
+ "github.com/redis/go-redis/v9"
+)
+
+const (
+ lockTimeout = 1 * time.Minute
+ dequeueTimeout = 5 * time.Second
+)
+
+func RunScraper(
+ ctx context.Context,
+ redis *redis.Client,
+ client *ibd.Client,
+ store database.StockStore,
+ name string,
+) error {
+ queue, err := taskqueue.New(
+ ctx,
+ redis,
+ scrape.Queue,
+ name,
+ taskqueue.WithEncoding[scrape.TaskInfo](scrape.QueueEncoding),
+ )
+ if err != nil {
+ return err
+ }
+
+ aQueue, err := taskqueue.New(
+ ctx,
+ redis,
+ analyzer.Queue,
+ name,
+ taskqueue.WithEncoding[analyzer.TaskInfo](analyzer.QueueEncoding),
+ )
+ if err != nil {
+ return err
+ }
+
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ waitForTask(ctx, queue, aQueue, client, store)
+ }
+ }
+}
+
+func waitForTask(
+ ctx context.Context,
+ queue taskqueue.TaskQueue[scrape.TaskInfo],
+ aQueue taskqueue.TaskQueue[analyzer.TaskInfo],
+ client *ibd.Client,
+ store database.StockStore,
+) {
+ task, err := queue.Dequeue(ctx, lockTimeout, dequeueTimeout)
+ if err != nil {
+ slog.ErrorContext(ctx, "Failed to dequeue task", "error", err)
+ return
+ }
+ if task == nil {
+ // No task available.
+ return
+ }
+
+ ch := make(chan error)
+ go func() {
+ defer close(ch)
+ ch <- scrapeUrl(ctx, client, store, aQueue, task.Data.Symbol)
+ }()
+
+ ticker := time.NewTicker(lockTimeout / 5)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ // Context was canceled. Return early.
+ return
+ case <-ticker.C:
+ // Extend the lock periodically.
+ func() {
+ ctx, cancel := context.WithTimeout(ctx, lockTimeout/5)
+ defer cancel()
+
+ err := queue.Extend(ctx, task.ID)
+ if err != nil {
+ slog.ErrorContext(ctx, "Failed to extend lock", "error", err)
+ }
+ }()
+ case err = <-ch:
+ // scrapeUrl has completed.
+ if err != nil {
+ slog.ErrorContext(ctx, "Failed to scrape URL", "error", err)
+ _, err = queue.Return(ctx, task.ID, err)
+ if err != nil {
+ slog.ErrorContext(ctx, "Failed to return task", "error", err)
+ return
+ }
+ } else {
+ slog.DebugContext(ctx, "Scraped URL", "symbol", task.Data.Symbol)
+ err = queue.Complete(ctx, task.ID, nil)
+ if err != nil {
+ slog.ErrorContext(ctx, "Failed to complete task", "error", err)
+ return
+ }
+ }
+ return
+ }
+ }
+}
+
+func scrapeUrl(
+ ctx context.Context,
+ client *ibd.Client,
+ store database.StockStore,
+ aQueue taskqueue.TaskQueue[analyzer.TaskInfo],
+ symbol string,
+) error {
+ ctx, cancel := context.WithTimeout(ctx, lockTimeout)
+ defer cancel()
+
+ stockUrl, err := getStockUrl(ctx, store, client, symbol)
+ if err != nil {
+ return fmt.Errorf("failed to get stock url: %w", err)
+ }
+
+ // Scrape the stock info.
+ info, err := client.StockInfo(ctx, stockUrl)
+ if err != nil {
+ return fmt.Errorf("failed to get stock info: %w", err)
+ }
+
+ // Add stock info to the database.
+ id, err := store.AddStockInfo(ctx, info)
+ if err != nil {
+ return fmt.Errorf("failed to add stock info: %w", err)
+ }
+
+ // Add the stock to the analyzer queue.
+ _, err = aQueue.Enqueue(ctx, analyzer.TaskInfo{ID: id})
+ if err != nil {
+ return fmt.Errorf("failed to enqueue analysis task: %w", err)
+ }
+
+ slog.DebugContext(ctx, "Added stock info", "id", id)
+
+ return nil
+}
+
+func getStockUrl(ctx context.Context, store database.StockStore, client *ibd.Client, symbol string) (string, error) {
+ // Get the stock from the database.
+ stock, err := store.GetStock(ctx, symbol)
+ if err == nil {
+ return stock.IBDUrl, nil
+ }
+ if !errors.Is(err, database.ErrStockNotFound) {
+ return "", fmt.Errorf("failed to get stock: %w", err)
+ }
+
+ // If stock isn't found in the database, get the stock from IBD.
+ stock, err = client.Search(ctx, symbol)
+ if errors.Is(err, ibd.ErrSymbolNotFound) {
+ return "", fmt.Errorf("symbol not found: %w", err)
+ }
+ if err != nil {
+ return "", fmt.Errorf("failed to search for symbol: %w", err)
+ }
+
+ // Add the stock to the database.
+ err = store.AddStock(ctx, stock)
+ if err != nil {
+ return "", fmt.Errorf("failed to add stock: %w", err)
+ }
+
+ return stock.IBDUrl, nil
+}