aboutsummaryrefslogtreecommitdiff
path: root/backend/internal/worker/scraper
diff options
context:
space:
mode:
authorGravatar Anshul Gupta <ansg191@anshulg.com> 2024-08-08 16:53:59 -0700
committerGravatar Anshul Gupta <ansg191@anshulg.com> 2024-08-08 16:53:59 -0700
commitf34b92ded11b07f78575ac62c260a380c468e5ea (patch)
tree8ffdc68ed0f2e253e7f9feff3aa90a1182e5946c /backend/internal/worker/scraper
parenta439618cdc8168bad617d04875697b572f3ed41d (diff)
downloadibd-trader-f34b92ded11b07f78575ac62c260a380c468e5ea.tar.gz
ibd-trader-f34b92ded11b07f78575ac62c260a380c468e5ea.tar.zst
ibd-trader-f34b92ded11b07f78575ac62c260a380c468e5ea.zip
Rework redis taskqueue to store task results
Diffstat (limited to 'backend/internal/worker/scraper')
-rw-r--r--backend/internal/worker/scraper/scraper.go55
1 files changed, 31 insertions, 24 deletions
diff --git a/backend/internal/worker/scraper/scraper.go b/backend/internal/worker/scraper/scraper.go
index 4788834..c5c1b6c 100644
--- a/backend/internal/worker/scraper/scraper.go
+++ b/backend/internal/worker/scraper/scraper.go
@@ -77,10 +77,17 @@ func waitForTask(
return
}
- ch := make(chan error)
+ errCh := make(chan error)
+ resCh := make(chan string)
+ defer close(errCh)
+ defer close(resCh)
go func() {
- defer close(ch)
- ch <- scrapeUrl(ctx, client, db, aQueue, task.Data.Symbol)
+ res, err := scrapeUrl(ctx, client, db, aQueue, task.Data.Symbol)
+ if err != nil {
+ errCh <- err
+ return
+ }
+ resCh <- res
}()
ticker := time.NewTicker(lockTimeout / 5)
@@ -102,22 +109,22 @@ func waitForTask(
slog.ErrorContext(ctx, "Failed to extend lock", "error", err)
}
}()
- case err = <-ch:
- // scrapeUrl has completed.
+ case err = <-errCh:
+ // scrapeUrl has errored.
+ slog.ErrorContext(ctx, "Failed to scrape URL", "error", err)
+ _, err = queue.Return(ctx, task.ID, err)
if err != nil {
- slog.ErrorContext(ctx, "Failed to scrape URL", "error", err)
- _, err = queue.Return(ctx, task.ID, err)
- if err != nil {
- slog.ErrorContext(ctx, "Failed to return task", "error", err)
- return
- }
- } else {
- slog.DebugContext(ctx, "Scraped URL", "symbol", task.Data.Symbol)
- err = queue.Complete(ctx, task.ID, nil)
- if err != nil {
- slog.ErrorContext(ctx, "Failed to complete task", "error", err)
- return
- }
+ slog.ErrorContext(ctx, "Failed to return task", "error", err)
+ return
+ }
+ return
+ case res := <-resCh:
+ // scrapeUrl has completed successfully.
+ slog.DebugContext(ctx, "Scraped URL", "symbol", task.Data.Symbol)
+ err = queue.Complete(ctx, task.ID, res)
+ if err != nil {
+ slog.ErrorContext(ctx, "Failed to complete task", "error", err)
+ return
}
return
}
@@ -130,36 +137,36 @@ func scrapeUrl(
db database.TransactionExecutor,
aQueue taskqueue.TaskQueue[analyzer.TaskInfo],
symbol string,
-) error {
+) (string, error) {
ctx, cancel := context.WithTimeout(ctx, lockTimeout)
defer cancel()
stockUrl, err := getStockUrl(ctx, db, client, symbol)
if err != nil {
- return fmt.Errorf("failed to get stock url: %w", err)
+ return "", fmt.Errorf("failed to get stock url: %w", err)
}
// Scrape the stock info.
info, err := client.StockInfo(ctx, stockUrl)
if err != nil {
- return fmt.Errorf("failed to get stock info: %w", err)
+ return "", fmt.Errorf("failed to get stock info: %w", err)
}
// Add stock info to the database.
id, err := database.AddStockInfo(ctx, db, info)
if err != nil {
- return fmt.Errorf("failed to add stock info: %w", err)
+ return "", fmt.Errorf("failed to add stock info: %w", err)
}
// Add the stock to the analyzer queue.
_, err = aQueue.Enqueue(ctx, analyzer.TaskInfo{ID: id})
if err != nil {
- return fmt.Errorf("failed to enqueue analysis task: %w", err)
+ return "", fmt.Errorf("failed to enqueue analysis task: %w", err)
}
slog.DebugContext(ctx, "Added stock info", "id", id)
- return nil
+ return id, nil
}
func getStockUrl(ctx context.Context, db database.TransactionExecutor, client *ibd.Client, symbol string) (string, error) {
3a48954586e613c7fd9bd3a7c22adda638eae809&showmsg=1&follow=1'>Expand)AuthorFilesLines 2022-02-07Update transpiler.test.jsGravatar Jarred Sumner 1-1/+3 2022-02-07[TS] Implement `import {type foo} from 'bar';` (type inside clause)Gravatar Jarred Sumner 2-20/+112 2022-02-07[bun-macro-relay] resolve the artifact directoryGravatar Jarred Sumner 1-11/+2 2022-02-06Update README.mdGravatar Jarred Sumner 1-1/+0 2022-02-06Update README.mdGravatar Jarred Sumner 1-0/+1 2022-02-06Little docGravatar Jarred Sumner 2-9/+49 2022-02-06Update README.mdbun-v0.0.69Gravatar Jarred Sumner 1-0/+2 2022-02-06Update README.mdGravatar Jarred Sumner 1-2/+3 2022-02-06Update README.mdGravatar Jarred Sumner 1-2/+2 2022-02-06Update README.mdGravatar Jarred Sumner 1-2/+2 2022-02-06Update README.mdGravatar Jarred Sumner 1-0/+15 2022-02-05Add unit test for toml importsGravatar Jarred Sumner 2-0/+47 2022-02-05[TOML] Fix bug with [[arrays]]Gravatar Jarred Sumner 1-2/+5 2022-02-05Handle promise rejections in testsGravatar Jarred Sumner 2-19/+35 2022-02-05Update resolve_path.zigGravatar Jarred Sumner 1-37/+35 2022-02-05Always try to load bunfig.toml for `install`, `dev`, `bun`, `test`Gravatar Jarred Sumner 1-3/+25 2022-02-05()Gravatar Jarred Sumner 1-1/+1 2022-02-05One less memcpyGravatar Jarred Sumner 1-68/+100 2022-02-05Further reliability improvements to http clientGravatar Jarred Sumner 2-14/+62 2022-02-05Implement keep-alive but disable itGravatar Jarred Sumner 2-17/+126 2022-02-05Make bun-install slower but more reliable on Linux Kernel 5.5 and lowerGravatar Jarred SUmner 1-112/+31 2022-02-04Several reliability improvements to HTTPGravatar Jarred SUmner 12-141/+189 2022-02-04reminderGravatar Jarred Sumner 3-5/+6 2022-02-04`path.resolve()` passes testsGravatar Jarred Sumner 2-13/+81 2022-02-04Update multiple-var.jsGravatar Jarred Sumner 1-1/+2 2022-02-04:camera:Gravatar Jarred Sumner 74-340/+669 2022-02-04Update snippets.jsonGravatar Jarred Sumner 1-1/+3 2022-02-04Add integration test for reading .json files that have UTF-8 string literalsGravatar Jarred Sumner 2-0/+9 2022-02-04[http] fix segfaultGravatar Jarred Sumner 1-17/+25 2022-02-04[bun dev] Fix bug with serving static files on next.js apps introduced in af6...Gravatar Jarred Sumner 1-5/+7 2022-02-04Update types.zigGravatar Jarred Sumner 1-9/+10 2022-02-04Update test_command.zigGravatar Jarred Sumner 1-2/+0 2022-02-04`path.normalize()` tests passGravatar Jarred Sumner 2-146/+213 2022-02-03Fix test failures in path.joinGravatar Jarred Sumner 1-8/+115 2022-02-03Update mimalloc_arena.zigGravatar Jarred Sumner 1-0/+9 2022-02-03[bun test] Support multiple filesGravatar Jarred Sumner 1-2/+12 2022-02-03Update js_ast.zigGravatar Jarred Sumner 1-0/+1 2022-02-03Support loading multiple entry points by changing what `bun:main` points toGravatar Jarred Sumner 6-4/+36 2022-02-03[bun install] Configurable max http retry countGravatar Jarred Sumner 1-0/+7 2022-02-03Missing newline in errors in bun installGravatar Jarred Sumner 1-4/+8 2022-02-03Fix bug with http clientGravatar Jarred Sumner 6-107/+101 2022-02-03Move detectFastRefresh to later so HTTP request handler starts fasterGravatar Jarred Sumner 1-2/+1 2022-02-03Fix bug with macro remaps in Bun.Transpiler apiGravatar Jarred Sumner 2-5/+8 2022-02-03Slight improvement to non-ascii file path handlingGravatar Jarred Sumner 4-18/+79 2022-02-02`path.relative` passes Node's tests (which also fixed bugs)Gravatar Jarred Sumner 8-283/+571