package analyzer import ( "context" "log/slog" "time" "github.com/ansg191/ibd-trader/backend/internal/analyzer" "github.com/ansg191/ibd-trader/backend/internal/database" "github.com/ansg191/ibd-trader/backend/internal/redis/taskqueue" "github.com/redis/go-redis/v9" ) const ( Queue = "analyzer" QueueEncoding = taskqueue.EncodingJSON lockTimeout = 1 * time.Minute dequeueTimeout = 5 * time.Second ) func RunAnalyzer( ctx context.Context, redis *redis.Client, analyzer analyzer.Analyzer, db database.Executor, name string, ) error { queue, err := taskqueue.New( ctx, redis, Queue, name, taskqueue.WithEncoding[TaskInfo](QueueEncoding), ) if err != nil { return err } for { select { case <-ctx.Done(): return ctx.Err() default: waitForTask(ctx, queue, analyzer, db) } } } func waitForTask( ctx context.Context, queue taskqueue.TaskQueue[TaskInfo], analyzer analyzer.Analyzer, db database.Executor, ) { task, err := queue.Dequeue(ctx, lockTimeout, dequeueTimeout) if err != nil { slog.ErrorContext(ctx, "Failed to dequeue task", "error", err) return } if task == nil { // No task available. return } errCh := make(chan error) resCh := make(chan string) defer close(errCh) defer close(resCh) go func() { res, err := analyzeStock(ctx, analyzer, db, task.Data.ID) if err != nil { errCh <- err return } resCh <- res }() ticker := time.NewTicker(lockTimeout / 5) defer ticker.Stop() for { select { case <-ctx.Done(): // Context was canceled. Return early. return case <-ticker.C: // Extend the lock periodically. func() { ctx, cancel := context.WithTimeout(ctx, lockTimeout/5) defer cancel() err := queue.Extend(ctx, task.ID) if err != nil { slog.ErrorContext(ctx, "Failed to extend lock", "error", err) } }() case err = <-errCh: // analyzeStock has errored. slog.ErrorContext(ctx, "Failed to analyze", "error", err) _, err = queue.Return(ctx, task.ID, err) if err != nil { slog.ErrorContext(ctx, "Failed to return task", "error", err) return } return case res := <-resCh: // analyzeStock has completed successfully. slog.DebugContext(ctx, "Analyzed ID", "id", task.Data.ID, "result", res) err = queue.Complete(ctx, task.ID, res) if err != nil { slog.ErrorContext(ctx, "Failed to complete task", "error", err) return } return } } } func analyzeStock(ctx context.Context, a analyzer.Analyzer, db database.Executor, id string) (string, error) { info, err := database.GetStockInfo(ctx, db, id) if err != nil { return "", err } analysis, err := a.Analyze( ctx, info.Symbol, info.Price, info.ChartAnalysis, ) if err != nil { return "", err } return database.AddAnalysis(ctx, db, id, analysis) } type TaskInfo struct { ID string `json:"id"` }