aboutsummaryrefslogtreecommitdiff
path: root/backend/internal/worker/analyzer
diff options
context:
space:
mode:
authorGravatar Anshul Gupta <ansg191@anshulg.com> 2024-08-08 16:53:59 -0700
committerGravatar Anshul Gupta <ansg191@anshulg.com> 2024-08-08 16:53:59 -0700
commitf34b92ded11b07f78575ac62c260a380c468e5ea (patch)
tree8ffdc68ed0f2e253e7f9feff3aa90a1182e5946c /backend/internal/worker/analyzer
parenta439618cdc8168bad617d04875697b572f3ed41d (diff)
downloadibd-trader-f34b92ded11b07f78575ac62c260a380c468e5ea.tar.gz
ibd-trader-f34b92ded11b07f78575ac62c260a380c468e5ea.tar.zst
ibd-trader-f34b92ded11b07f78575ac62c260a380c468e5ea.zip
Rework redis taskqueue to store task results
Diffstat (limited to 'backend/internal/worker/analyzer')
-rw-r--r--backend/internal/worker/analyzer/analyzer.go49
1 files changed, 28 insertions, 21 deletions
diff --git a/backend/internal/worker/analyzer/analyzer.go b/backend/internal/worker/analyzer/analyzer.go
index ea8069e..20621dd 100644
--- a/backend/internal/worker/analyzer/analyzer.go
+++ b/backend/internal/worker/analyzer/analyzer.go
@@ -64,10 +64,17 @@ func waitForTask(
return
}
- ch := make(chan error)
- defer close(ch)
+ errCh := make(chan error)
+ resCh := make(chan string)
+ defer close(errCh)
+ defer close(resCh)
go func() {
- ch <- analyzeStock(ctx, analyzer, db, task.Data.ID)
+ res, err := analyzeStock(ctx, analyzer, db, task.Data.ID)
+ if err != nil {
+ errCh <- err
+ return
+ }
+ resCh <- res
}()
ticker := time.NewTicker(lockTimeout / 5)
@@ -89,32 +96,32 @@ func waitForTask(
slog.ErrorContext(ctx, "Failed to extend lock", "error", err)
}
}()
- case err = <-ch:
- // scrapeUrl has completed.
+ case err = <-errCh:
+ // analyzeStock has errored.
+ slog.ErrorContext(ctx, "Failed to analyze", "error", err)
+ _, err = queue.Return(ctx, task.ID, err)
if err != nil {
- slog.ErrorContext(ctx, "Failed to analyze", "error", err)
- _, err = queue.Return(ctx, task.ID, err)
- if err != nil {
- slog.ErrorContext(ctx, "Failed to return task", "error", err)
- return
- }
- } else {
- slog.DebugContext(ctx, "Analyzed ID", "id", task.Data.ID)
- err = queue.Complete(ctx, task.ID, nil)
- if err != nil {
- slog.ErrorContext(ctx, "Failed to complete task", "error", err)
- return
- }
+ slog.ErrorContext(ctx, "Failed to return task", "error", err)
+ return
+ }
+ return
+ case res := <-resCh:
+ // analyzeStock has completed successfully.
+ slog.DebugContext(ctx, "Analyzed ID", "id", task.Data.ID, "result", res)
+ err = queue.Complete(ctx, task.ID, res)
+ if err != nil {
+ slog.ErrorContext(ctx, "Failed to complete task", "error", err)
+ return
}
return
}
}
}
-func analyzeStock(ctx context.Context, a analyzer.Analyzer, db database.Executor, id string) error {
+func analyzeStock(ctx context.Context, a analyzer.Analyzer, db database.Executor, id string) (string, error) {
info, err := database.GetStockInfo(ctx, db, id)
if err != nil {
- return err
+ return "", err
}
analysis, err := a.Analyze(
@@ -124,7 +131,7 @@ func analyzeStock(ctx context.Context, a analyzer.Analyzer, db database.Executor
info.ChartAnalysis,
)
if err != nil {
- return err
+ return "", err
}
return database.AddAnalysis(ctx, db, id, analysis)