diff options
author | 2024-08-08 16:53:59 -0700 | |
---|---|---|
committer | 2024-08-08 16:53:59 -0700 | |
commit | f34b92ded11b07f78575ac62c260a380c468e5ea (patch) | |
tree | 8ffdc68ed0f2e253e7f9feff3aa90a1182e5946c /backend/internal/worker | |
parent | a439618cdc8168bad617d04875697b572f3ed41d (diff) | |
download | ibd-trader-f34b92ded11b07f78575ac62c260a380c468e5ea.tar.gz ibd-trader-f34b92ded11b07f78575ac62c260a380c468e5ea.tar.zst ibd-trader-f34b92ded11b07f78575ac62c260a380c468e5ea.zip |
Rework redis taskqueue to store task results
Diffstat (limited to 'backend/internal/worker')
-rw-r--r-- | backend/internal/worker/analyzer/analyzer.go | 49 | ||||
-rw-r--r-- | backend/internal/worker/auth/auth.go | 2 | ||||
-rw-r--r-- | backend/internal/worker/scraper/scraper.go | 55 |
3 files changed, 60 insertions, 46 deletions
diff --git a/backend/internal/worker/analyzer/analyzer.go b/backend/internal/worker/analyzer/analyzer.go index ea8069e..20621dd 100644 --- a/backend/internal/worker/analyzer/analyzer.go +++ b/backend/internal/worker/analyzer/analyzer.go @@ -64,10 +64,17 @@ func waitForTask( return } - ch := make(chan error) - defer close(ch) + errCh := make(chan error) + resCh := make(chan string) + defer close(errCh) + defer close(resCh) go func() { - ch <- analyzeStock(ctx, analyzer, db, task.Data.ID) + res, err := analyzeStock(ctx, analyzer, db, task.Data.ID) + if err != nil { + errCh <- err + return + } + resCh <- res }() ticker := time.NewTicker(lockTimeout / 5) @@ -89,32 +96,32 @@ func waitForTask( slog.ErrorContext(ctx, "Failed to extend lock", "error", err) } }() - case err = <-ch: - // scrapeUrl has completed. + case err = <-errCh: + // analyzeStock has errored. + slog.ErrorContext(ctx, "Failed to analyze", "error", err) + _, err = queue.Return(ctx, task.ID, err) if err != nil { - slog.ErrorContext(ctx, "Failed to analyze", "error", err) - _, err = queue.Return(ctx, task.ID, err) - if err != nil { - slog.ErrorContext(ctx, "Failed to return task", "error", err) - return - } - } else { - slog.DebugContext(ctx, "Analyzed ID", "id", task.Data.ID) - err = queue.Complete(ctx, task.ID, nil) - if err != nil { - slog.ErrorContext(ctx, "Failed to complete task", "error", err) - return - } + slog.ErrorContext(ctx, "Failed to return task", "error", err) + return + } + return + case res := <-resCh: + // analyzeStock has completed successfully. + slog.DebugContext(ctx, "Analyzed ID", "id", task.Data.ID, "result", res) + err = queue.Complete(ctx, task.ID, res) + if err != nil { + slog.ErrorContext(ctx, "Failed to complete task", "error", err) + return } return } } } -func analyzeStock(ctx context.Context, a analyzer.Analyzer, db database.Executor, id string) error { +func analyzeStock(ctx context.Context, a analyzer.Analyzer, db database.Executor, id string) (string, error) { info, err := database.GetStockInfo(ctx, db, id) if err != nil { - return err + return "", err } analysis, err := a.Analyze( @@ -124,7 +131,7 @@ func analyzeStock(ctx context.Context, a analyzer.Analyzer, db database.Executor info.ChartAnalysis, ) if err != nil { - return err + return "", err } return database.AddAnalysis(ctx, db, id, analysis) diff --git a/backend/internal/worker/auth/auth.go b/backend/internal/worker/auth/auth.go index 2043b5e..0daa112 100644 --- a/backend/internal/worker/auth/auth.go +++ b/backend/internal/worker/auth/auth.go @@ -104,7 +104,7 @@ func waitForTask( return } } else { - err = queue.Complete(ctx, task.ID, nil) + err = queue.Complete(ctx, task.ID, "") if err != nil { slog.ErrorContext(ctx, "Failed to complete task", "error", err) return diff --git a/backend/internal/worker/scraper/scraper.go b/backend/internal/worker/scraper/scraper.go index 4788834..c5c1b6c 100644 --- a/backend/internal/worker/scraper/scraper.go +++ b/backend/internal/worker/scraper/scraper.go @@ -77,10 +77,17 @@ func waitForTask( return } - ch := make(chan error) + errCh := make(chan error) + resCh := make(chan string) + defer close(errCh) + defer close(resCh) go func() { - defer close(ch) - ch <- scrapeUrl(ctx, client, db, aQueue, task.Data.Symbol) + res, err := scrapeUrl(ctx, client, db, aQueue, task.Data.Symbol) + if err != nil { + errCh <- err + return + } + resCh <- res }() ticker := time.NewTicker(lockTimeout / 5) @@ -102,22 +109,22 @@ func waitForTask( slog.ErrorContext(ctx, "Failed to extend lock", "error", err) } }() - case err = <-ch: - // scrapeUrl has completed. + case err = <-errCh: + // scrapeUrl has errored. + slog.ErrorContext(ctx, "Failed to scrape URL", "error", err) + _, err = queue.Return(ctx, task.ID, err) if err != nil { - slog.ErrorContext(ctx, "Failed to scrape URL", "error", err) - _, err = queue.Return(ctx, task.ID, err) - if err != nil { - slog.ErrorContext(ctx, "Failed to return task", "error", err) - return - } - } else { - slog.DebugContext(ctx, "Scraped URL", "symbol", task.Data.Symbol) - err = queue.Complete(ctx, task.ID, nil) - if err != nil { - slog.ErrorContext(ctx, "Failed to complete task", "error", err) - return - } + slog.ErrorContext(ctx, "Failed to return task", "error", err) + return + } + return + case res := <-resCh: + // scrapeUrl has completed successfully. + slog.DebugContext(ctx, "Scraped URL", "symbol", task.Data.Symbol) + err = queue.Complete(ctx, task.ID, res) + if err != nil { + slog.ErrorContext(ctx, "Failed to complete task", "error", err) + return } return } @@ -130,36 +137,36 @@ func scrapeUrl( db database.TransactionExecutor, aQueue taskqueue.TaskQueue[analyzer.TaskInfo], symbol string, -) error { +) (string, error) { ctx, cancel := context.WithTimeout(ctx, lockTimeout) defer cancel() stockUrl, err := getStockUrl(ctx, db, client, symbol) if err != nil { - return fmt.Errorf("failed to get stock url: %w", err) + return "", fmt.Errorf("failed to get stock url: %w", err) } // Scrape the stock info. info, err := client.StockInfo(ctx, stockUrl) if err != nil { - return fmt.Errorf("failed to get stock info: %w", err) + return "", fmt.Errorf("failed to get stock info: %w", err) } // Add stock info to the database. id, err := database.AddStockInfo(ctx, db, info) if err != nil { - return fmt.Errorf("failed to add stock info: %w", err) + return "", fmt.Errorf("failed to add stock info: %w", err) } // Add the stock to the analyzer queue. _, err = aQueue.Enqueue(ctx, analyzer.TaskInfo{ID: id}) if err != nil { - return fmt.Errorf("failed to enqueue analysis task: %w", err) + return "", fmt.Errorf("failed to enqueue analysis task: %w", err) } slog.DebugContext(ctx, "Added stock info", "id", id) - return nil + return id, nil } func getStockUrl(ctx context.Context, db database.TransactionExecutor, client *ibd.Client, symbol string) (string, error) { |