diff options
author | 2024-08-11 13:15:50 -0700 | |
---|---|---|
committer | 2024-08-11 13:15:50 -0700 | |
commit | 6a3c21fb0b1c126849f2bbff494403bbe901448e (patch) | |
tree | 5d7805524357c2c8a9819c39d2051a4e3633a1d5 /backend/internal/worker/analyzer | |
parent | 29c6040a51616e9e4cf6c70ee16391b2a3b238c9 (diff) | |
parent | f34b92ded11b07f78575ac62c260a380c468e5ea (diff) | |
download | ibd-trader-6a3c21fb0b1c126849f2bbff494403bbe901448e.tar.gz ibd-trader-6a3c21fb0b1c126849f2bbff494403bbe901448e.tar.zst ibd-trader-6a3c21fb0b1c126849f2bbff494403bbe901448e.zip |
Merge remote-tracking branch 'backend/main'
Diffstat (limited to 'backend/internal/worker/analyzer')
-rw-r--r-- | backend/internal/worker/analyzer/analyzer.go | 142 |
1 files changed, 142 insertions, 0 deletions
diff --git a/backend/internal/worker/analyzer/analyzer.go b/backend/internal/worker/analyzer/analyzer.go new file mode 100644 index 0000000..20621dd --- /dev/null +++ b/backend/internal/worker/analyzer/analyzer.go @@ -0,0 +1,142 @@ +package analyzer + +import ( + "context" + "log/slog" + "time" + + "github.com/ansg191/ibd-trader-backend/internal/analyzer" + "github.com/ansg191/ibd-trader-backend/internal/database" + "github.com/ansg191/ibd-trader-backend/internal/redis/taskqueue" + + "github.com/redis/go-redis/v9" +) + +const ( + Queue = "analyzer" + QueueEncoding = taskqueue.EncodingJSON + + lockTimeout = 1 * time.Minute + dequeueTimeout = 5 * time.Second +) + +func RunAnalyzer( + ctx context.Context, + redis *redis.Client, + analyzer analyzer.Analyzer, + db database.Executor, + name string, +) error { + queue, err := taskqueue.New( + ctx, + redis, + Queue, + name, + taskqueue.WithEncoding[TaskInfo](QueueEncoding), + ) + if err != nil { + return err + } + + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + waitForTask(ctx, queue, analyzer, db) + } + } +} + +func waitForTask( + ctx context.Context, + queue taskqueue.TaskQueue[TaskInfo], + analyzer analyzer.Analyzer, + db database.Executor, +) { + task, err := queue.Dequeue(ctx, lockTimeout, dequeueTimeout) + if err != nil { + slog.ErrorContext(ctx, "Failed to dequeue task", "error", err) + return + } + if task == nil { + // No task available. + return + } + + errCh := make(chan error) + resCh := make(chan string) + defer close(errCh) + defer close(resCh) + go func() { + res, err := analyzeStock(ctx, analyzer, db, task.Data.ID) + if err != nil { + errCh <- err + return + } + resCh <- res + }() + + ticker := time.NewTicker(lockTimeout / 5) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + // Context was canceled. Return early. + return + case <-ticker.C: + // Extend the lock periodically. + func() { + ctx, cancel := context.WithTimeout(ctx, lockTimeout/5) + defer cancel() + + err := queue.Extend(ctx, task.ID) + if err != nil { + slog.ErrorContext(ctx, "Failed to extend lock", "error", err) + } + }() + case err = <-errCh: + // analyzeStock has errored. + slog.ErrorContext(ctx, "Failed to analyze", "error", err) + _, err = queue.Return(ctx, task.ID, err) + if err != nil { + slog.ErrorContext(ctx, "Failed to return task", "error", err) + return + } + return + case res := <-resCh: + // analyzeStock has completed successfully. + slog.DebugContext(ctx, "Analyzed ID", "id", task.Data.ID, "result", res) + err = queue.Complete(ctx, task.ID, res) + if err != nil { + slog.ErrorContext(ctx, "Failed to complete task", "error", err) + return + } + return + } + } +} + +func analyzeStock(ctx context.Context, a analyzer.Analyzer, db database.Executor, id string) (string, error) { + info, err := database.GetStockInfo(ctx, db, id) + if err != nil { + return "", err + } + + analysis, err := a.Analyze( + ctx, + info.Symbol, + info.Price, + info.ChartAnalysis, + ) + if err != nil { + return "", err + } + + return database.AddAnalysis(ctx, db, id, analysis) +} + +type TaskInfo struct { + ID string `json:"id"` +} |