aboutsummaryrefslogtreecommitdiff
path: root/backend/internal/worker/analyzer
diff options
context:
space:
mode:
authorGravatar Anshul Gupta <ansg191@anshulg.com> 2024-08-11 13:15:50 -0700
committerGravatar Anshul Gupta <ansg191@anshulg.com> 2024-08-11 13:15:50 -0700
commit6a3c21fb0b1c126849f2bbff494403bbe901448e (patch)
tree5d7805524357c2c8a9819c39d2051a4e3633a1d5 /backend/internal/worker/analyzer
parent29c6040a51616e9e4cf6c70ee16391b2a3b238c9 (diff)
parentf34b92ded11b07f78575ac62c260a380c468e5ea (diff)
downloadibd-trader-6a3c21fb0b1c126849f2bbff494403bbe901448e.tar.gz
ibd-trader-6a3c21fb0b1c126849f2bbff494403bbe901448e.tar.zst
ibd-trader-6a3c21fb0b1c126849f2bbff494403bbe901448e.zip
Merge remote-tracking branch 'backend/main'
Diffstat (limited to 'backend/internal/worker/analyzer')
-rw-r--r--backend/internal/worker/analyzer/analyzer.go142
1 files changed, 142 insertions, 0 deletions
diff --git a/backend/internal/worker/analyzer/analyzer.go b/backend/internal/worker/analyzer/analyzer.go
new file mode 100644
index 0000000..20621dd
--- /dev/null
+++ b/backend/internal/worker/analyzer/analyzer.go
@@ -0,0 +1,142 @@
+package analyzer
+
+import (
+ "context"
+ "log/slog"
+ "time"
+
+ "github.com/ansg191/ibd-trader-backend/internal/analyzer"
+ "github.com/ansg191/ibd-trader-backend/internal/database"
+ "github.com/ansg191/ibd-trader-backend/internal/redis/taskqueue"
+
+ "github.com/redis/go-redis/v9"
+)
+
+const (
+ Queue = "analyzer"
+ QueueEncoding = taskqueue.EncodingJSON
+
+ lockTimeout = 1 * time.Minute
+ dequeueTimeout = 5 * time.Second
+)
+
+func RunAnalyzer(
+ ctx context.Context,
+ redis *redis.Client,
+ analyzer analyzer.Analyzer,
+ db database.Executor,
+ name string,
+) error {
+ queue, err := taskqueue.New(
+ ctx,
+ redis,
+ Queue,
+ name,
+ taskqueue.WithEncoding[TaskInfo](QueueEncoding),
+ )
+ if err != nil {
+ return err
+ }
+
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ waitForTask(ctx, queue, analyzer, db)
+ }
+ }
+}
+
+func waitForTask(
+ ctx context.Context,
+ queue taskqueue.TaskQueue[TaskInfo],
+ analyzer analyzer.Analyzer,
+ db database.Executor,
+) {
+ task, err := queue.Dequeue(ctx, lockTimeout, dequeueTimeout)
+ if err != nil {
+ slog.ErrorContext(ctx, "Failed to dequeue task", "error", err)
+ return
+ }
+ if task == nil {
+ // No task available.
+ return
+ }
+
+ errCh := make(chan error)
+ resCh := make(chan string)
+ defer close(errCh)
+ defer close(resCh)
+ go func() {
+ res, err := analyzeStock(ctx, analyzer, db, task.Data.ID)
+ if err != nil {
+ errCh <- err
+ return
+ }
+ resCh <- res
+ }()
+
+ ticker := time.NewTicker(lockTimeout / 5)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ // Context was canceled. Return early.
+ return
+ case <-ticker.C:
+ // Extend the lock periodically.
+ func() {
+ ctx, cancel := context.WithTimeout(ctx, lockTimeout/5)
+ defer cancel()
+
+ err := queue.Extend(ctx, task.ID)
+ if err != nil {
+ slog.ErrorContext(ctx, "Failed to extend lock", "error", err)
+ }
+ }()
+ case err = <-errCh:
+ // analyzeStock has errored.
+ slog.ErrorContext(ctx, "Failed to analyze", "error", err)
+ _, err = queue.Return(ctx, task.ID, err)
+ if err != nil {
+ slog.ErrorContext(ctx, "Failed to return task", "error", err)
+ return
+ }
+ return
+ case res := <-resCh:
+ // analyzeStock has completed successfully.
+ slog.DebugContext(ctx, "Analyzed ID", "id", task.Data.ID, "result", res)
+ err = queue.Complete(ctx, task.ID, res)
+ if err != nil {
+ slog.ErrorContext(ctx, "Failed to complete task", "error", err)
+ return
+ }
+ return
+ }
+ }
+}
+
+func analyzeStock(ctx context.Context, a analyzer.Analyzer, db database.Executor, id string) (string, error) {
+ info, err := database.GetStockInfo(ctx, db, id)
+ if err != nil {
+ return "", err
+ }
+
+ analysis, err := a.Analyze(
+ ctx,
+ info.Symbol,
+ info.Price,
+ info.ChartAnalysis,
+ )
+ if err != nil {
+ return "", err
+ }
+
+ return database.AddAnalysis(ctx, db, id, analysis)
+}
+
+type TaskInfo struct {
+ ID string `json:"id"`
+}