diff options
author | 2024-08-08 16:53:59 -0700 | |
---|---|---|
committer | 2024-08-08 16:53:59 -0700 | |
commit | f34b92ded11b07f78575ac62c260a380c468e5ea (patch) | |
tree | 8ffdc68ed0f2e253e7f9feff3aa90a1182e5946c /backend/internal/worker/analyzer | |
parent | a439618cdc8168bad617d04875697b572f3ed41d (diff) | |
download | ibd-trader-f34b92ded11b07f78575ac62c260a380c468e5ea.tar.gz ibd-trader-f34b92ded11b07f78575ac62c260a380c468e5ea.tar.zst ibd-trader-f34b92ded11b07f78575ac62c260a380c468e5ea.zip |
Rework redis taskqueue to store task results
Diffstat (limited to 'backend/internal/worker/analyzer')
-rw-r--r-- | backend/internal/worker/analyzer/analyzer.go | 49 |
1 files changed, 28 insertions, 21 deletions
diff --git a/backend/internal/worker/analyzer/analyzer.go b/backend/internal/worker/analyzer/analyzer.go index ea8069e..20621dd 100644 --- a/backend/internal/worker/analyzer/analyzer.go +++ b/backend/internal/worker/analyzer/analyzer.go @@ -64,10 +64,17 @@ func waitForTask( return } - ch := make(chan error) - defer close(ch) + errCh := make(chan error) + resCh := make(chan string) + defer close(errCh) + defer close(resCh) go func() { - ch <- analyzeStock(ctx, analyzer, db, task.Data.ID) + res, err := analyzeStock(ctx, analyzer, db, task.Data.ID) + if err != nil { + errCh <- err + return + } + resCh <- res }() ticker := time.NewTicker(lockTimeout / 5) @@ -89,32 +96,32 @@ func waitForTask( slog.ErrorContext(ctx, "Failed to extend lock", "error", err) } }() - case err = <-ch: - // scrapeUrl has completed. + case err = <-errCh: + // analyzeStock has errored. + slog.ErrorContext(ctx, "Failed to analyze", "error", err) + _, err = queue.Return(ctx, task.ID, err) if err != nil { - slog.ErrorContext(ctx, "Failed to analyze", "error", err) - _, err = queue.Return(ctx, task.ID, err) - if err != nil { - slog.ErrorContext(ctx, "Failed to return task", "error", err) - return - } - } else { - slog.DebugContext(ctx, "Analyzed ID", "id", task.Data.ID) - err = queue.Complete(ctx, task.ID, nil) - if err != nil { - slog.ErrorContext(ctx, "Failed to complete task", "error", err) - return - } + slog.ErrorContext(ctx, "Failed to return task", "error", err) + return + } + return + case res := <-resCh: + // analyzeStock has completed successfully. + slog.DebugContext(ctx, "Analyzed ID", "id", task.Data.ID, "result", res) + err = queue.Complete(ctx, task.ID, res) + if err != nil { + slog.ErrorContext(ctx, "Failed to complete task", "error", err) + return } return } } } -func analyzeStock(ctx context.Context, a analyzer.Analyzer, db database.Executor, id string) error { +func analyzeStock(ctx context.Context, a analyzer.Analyzer, db database.Executor, id string) (string, error) { info, err := database.GetStockInfo(ctx, db, id) if err != nil { - return err + return "", err } analysis, err := a.Analyze( @@ -124,7 +131,7 @@ func analyzeStock(ctx context.Context, a analyzer.Analyzer, db database.Executor info.ChartAnalysis, ) if err != nil { - return err + return "", err } return database.AddAnalysis(ctx, db, id, analysis) |