aboutsummaryrefslogtreecommitdiff
path: root/backend/internal/worker
diff options
context:
space:
mode:
authorGravatar Anshul Gupta <ansg191@anshulg.com> 2024-08-08 16:53:59 -0700
committerGravatar Anshul Gupta <ansg191@anshulg.com> 2024-08-08 16:53:59 -0700
commitf34b92ded11b07f78575ac62c260a380c468e5ea (patch)
tree8ffdc68ed0f2e253e7f9feff3aa90a1182e5946c /backend/internal/worker
parenta439618cdc8168bad617d04875697b572f3ed41d (diff)
downloadibd-trader-f34b92ded11b07f78575ac62c260a380c468e5ea.tar.gz
ibd-trader-f34b92ded11b07f78575ac62c260a380c468e5ea.tar.zst
ibd-trader-f34b92ded11b07f78575ac62c260a380c468e5ea.zip
Rework redis taskqueue to store task results
Diffstat (limited to 'backend/internal/worker')
-rw-r--r--backend/internal/worker/analyzer/analyzer.go49
-rw-r--r--backend/internal/worker/auth/auth.go2
-rw-r--r--backend/internal/worker/scraper/scraper.go55
3 files changed, 60 insertions, 46 deletions
diff --git a/backend/internal/worker/analyzer/analyzer.go b/backend/internal/worker/analyzer/analyzer.go
index ea8069e..20621dd 100644
--- a/backend/internal/worker/analyzer/analyzer.go
+++ b/backend/internal/worker/analyzer/analyzer.go
@@ -64,10 +64,17 @@ func waitForTask(
return
}
- ch := make(chan error)
- defer close(ch)
+ errCh := make(chan error)
+ resCh := make(chan string)
+ defer close(errCh)
+ defer close(resCh)
go func() {
- ch <- analyzeStock(ctx, analyzer, db, task.Data.ID)
+ res, err := analyzeStock(ctx, analyzer, db, task.Data.ID)
+ if err != nil {
+ errCh <- err
+ return
+ }
+ resCh <- res
}()
ticker := time.NewTicker(lockTimeout / 5)
@@ -89,32 +96,32 @@ func waitForTask(
slog.ErrorContext(ctx, "Failed to extend lock", "error", err)
}
}()
- case err = <-ch:
- // scrapeUrl has completed.
+ case err = <-errCh:
+ // analyzeStock has errored.
+ slog.ErrorContext(ctx, "Failed to analyze", "error", err)
+ _, err = queue.Return(ctx, task.ID, err)
if err != nil {
- slog.ErrorContext(ctx, "Failed to analyze", "error", err)
- _, err = queue.Return(ctx, task.ID, err)
- if err != nil {
- slog.ErrorContext(ctx, "Failed to return task", "error", err)
- return
- }
- } else {
- slog.DebugContext(ctx, "Analyzed ID", "id", task.Data.ID)
- err = queue.Complete(ctx, task.ID, nil)
- if err != nil {
- slog.ErrorContext(ctx, "Failed to complete task", "error", err)
- return
- }
+ slog.ErrorContext(ctx, "Failed to return task", "error", err)
+ return
+ }
+ return
+ case res := <-resCh:
+ // analyzeStock has completed successfully.
+ slog.DebugContext(ctx, "Analyzed ID", "id", task.Data.ID, "result", res)
+ err = queue.Complete(ctx, task.ID, res)
+ if err != nil {
+ slog.ErrorContext(ctx, "Failed to complete task", "error", err)
+ return
}
return
}
}
}
-func analyzeStock(ctx context.Context, a analyzer.Analyzer, db database.Executor, id string) error {
+func analyzeStock(ctx context.Context, a analyzer.Analyzer, db database.Executor, id string) (string, error) {
info, err := database.GetStockInfo(ctx, db, id)
if err != nil {
- return err
+ return "", err
}
analysis, err := a.Analyze(
@@ -124,7 +131,7 @@ func analyzeStock(ctx context.Context, a analyzer.Analyzer, db database.Executor
info.ChartAnalysis,
)
if err != nil {
- return err
+ return "", err
}
return database.AddAnalysis(ctx, db, id, analysis)
diff --git a/backend/internal/worker/auth/auth.go b/backend/internal/worker/auth/auth.go
index 2043b5e..0daa112 100644
--- a/backend/internal/worker/auth/auth.go
+++ b/backend/internal/worker/auth/auth.go
@@ -104,7 +104,7 @@ func waitForTask(
return
}
} else {
- err = queue.Complete(ctx, task.ID, nil)
+ err = queue.Complete(ctx, task.ID, "")
if err != nil {
slog.ErrorContext(ctx, "Failed to complete task", "error", err)
return
diff --git a/backend/internal/worker/scraper/scraper.go b/backend/internal/worker/scraper/scraper.go
index 4788834..c5c1b6c 100644
--- a/backend/internal/worker/scraper/scraper.go
+++ b/backend/internal/worker/scraper/scraper.go
@@ -77,10 +77,17 @@ func waitForTask(
return
}
- ch := make(chan error)
+ errCh := make(chan error)
+ resCh := make(chan string)
+ defer close(errCh)
+ defer close(resCh)
go func() {
- defer close(ch)
- ch <- scrapeUrl(ctx, client, db, aQueue, task.Data.Symbol)
+ res, err := scrapeUrl(ctx, client, db, aQueue, task.Data.Symbol)
+ if err != nil {
+ errCh <- err
+ return
+ }
+ resCh <- res
}()
ticker := time.NewTicker(lockTimeout / 5)
@@ -102,22 +109,22 @@ func waitForTask(
slog.ErrorContext(ctx, "Failed to extend lock", "error", err)
}
}()
- case err = <-ch:
- // scrapeUrl has completed.
+ case err = <-errCh:
+ // scrapeUrl has errored.
+ slog.ErrorContext(ctx, "Failed to scrape URL", "error", err)
+ _, err = queue.Return(ctx, task.ID, err)
if err != nil {
- slog.ErrorContext(ctx, "Failed to scrape URL", "error", err)
- _, err = queue.Return(ctx, task.ID, err)
- if err != nil {
- slog.ErrorContext(ctx, "Failed to return task", "error", err)
- return
- }
- } else {
- slog.DebugContext(ctx, "Scraped URL", "symbol", task.Data.Symbol)
- err = queue.Complete(ctx, task.ID, nil)
- if err != nil {
- slog.ErrorContext(ctx, "Failed to complete task", "error", err)
- return
- }
+ slog.ErrorContext(ctx, "Failed to return task", "error", err)
+ return
+ }
+ return
+ case res := <-resCh:
+ // scrapeUrl has completed successfully.
+ slog.DebugContext(ctx, "Scraped URL", "symbol", task.Data.Symbol)
+ err = queue.Complete(ctx, task.ID, res)
+ if err != nil {
+ slog.ErrorContext(ctx, "Failed to complete task", "error", err)
+ return
}
return
}
@@ -130,36 +137,36 @@ func scrapeUrl(
db database.TransactionExecutor,
aQueue taskqueue.TaskQueue[analyzer.TaskInfo],
symbol string,
-) error {
+) (string, error) {
ctx, cancel := context.WithTimeout(ctx, lockTimeout)
defer cancel()
stockUrl, err := getStockUrl(ctx, db, client, symbol)
if err != nil {
- return fmt.Errorf("failed to get stock url: %w", err)
+ return "", fmt.Errorf("failed to get stock url: %w", err)
}
// Scrape the stock info.
info, err := client.StockInfo(ctx, stockUrl)
if err != nil {
- return fmt.Errorf("failed to get stock info: %w", err)
+ return "", fmt.Errorf("failed to get stock info: %w", err)
}
// Add stock info to the database.
id, err := database.AddStockInfo(ctx, db, info)
if err != nil {
- return fmt.Errorf("failed to add stock info: %w", err)
+ return "", fmt.Errorf("failed to add stock info: %w", err)
}
// Add the stock to the analyzer queue.
_, err = aQueue.Enqueue(ctx, analyzer.TaskInfo{ID: id})
if err != nil {
- return fmt.Errorf("failed to enqueue analysis task: %w", err)
+ return "", fmt.Errorf("failed to enqueue analysis task: %w", err)
}
slog.DebugContext(ctx, "Added stock info", "id", id)
- return nil
+ return id, nil
}
func getStockUrl(ctx context.Context, db database.TransactionExecutor, client *ibd.Client, symbol string) (string, error) {