aboutsummaryrefslogtreecommitdiff
path: root/backend/internal/worker/auth
diff options
context:
space:
mode:
authorGravatar Anshul Gupta <ansg191@anshulg.com> 2024-08-05 18:55:10 -0700
committerGravatar Anshul Gupta <ansg191@anshulg.com> 2024-08-05 18:55:19 -0700
commitb96fcd1a54a46a95f98467b49a051564bc21c23c (patch)
tree93caeeb05f8d6310e241095608ea2428c749b18c /backend/internal/worker/auth
downloadibd-trader-b96fcd1a54a46a95f98467b49a051564bc21c23c.tar.gz
ibd-trader-b96fcd1a54a46a95f98467b49a051564bc21c23c.tar.zst
ibd-trader-b96fcd1a54a46a95f98467b49a051564bc21c23c.zip
Initial Commit
Diffstat (limited to 'backend/internal/worker/auth')
-rw-r--r--backend/internal/worker/auth/auth.go228
1 files changed, 228 insertions, 0 deletions
diff --git a/backend/internal/worker/auth/auth.go b/backend/internal/worker/auth/auth.go
new file mode 100644
index 0000000..e1c6661
--- /dev/null
+++ b/backend/internal/worker/auth/auth.go
@@ -0,0 +1,228 @@
+package auth
+
+import (
+ "context"
+ "fmt"
+ "log/slog"
+ "time"
+
+ "ibd-trader/internal/database"
+ "ibd-trader/internal/ibd"
+ "ibd-trader/internal/leader/manager/ibd/auth"
+ "ibd-trader/internal/redis/taskqueue"
+
+ "github.com/redis/go-redis/v9"
+)
+
+const (
+ lockTimeout = 1 * time.Minute
+ dequeueTimeout = 5 * time.Second
+)
+
+func RunAuthScraper(
+ ctx context.Context,
+ client *ibd.Client,
+ redis *redis.Client,
+ users database.UserStore,
+ cookies database.CookieStore,
+ name string,
+) error {
+ queue, err := taskqueue.New(
+ ctx,
+ redis,
+ auth.Queue,
+ name,
+ taskqueue.WithEncoding[auth.TaskInfo](auth.QueueEncoding),
+ )
+ if err != nil {
+ return err
+ }
+
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ waitForTask(ctx, queue, client, users, cookies)
+ }
+ }
+}
+
+func waitForTask(
+ ctx context.Context,
+ queue taskqueue.TaskQueue[auth.TaskInfo],
+ client *ibd.Client,
+ users database.UserStore,
+ cookies database.CookieStore,
+) {
+ task, err := queue.Dequeue(ctx, lockTimeout, dequeueTimeout)
+ if err != nil {
+ slog.ErrorContext(ctx, "Failed to dequeue task", "error", err)
+ return
+ }
+ if task == nil {
+ // No task available.
+ return
+ }
+ slog.DebugContext(ctx, "Picked up auth task", "task", task.ID, "user", task.Data.UserSubject)
+
+ ch := make(chan error)
+ defer close(ch)
+ go func() {
+ ch <- scrapeCookies(ctx, client, users, cookies, task.Data.UserSubject)
+ }()
+
+ ticker := time.NewTicker(lockTimeout / 5)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ // The context was canceled. Return early.
+ return
+ case <-ticker.C:
+ // Extend the lock periodically.
+ func() {
+ ctx, cancel := context.WithTimeout(ctx, lockTimeout/5)
+ defer cancel()
+
+ err := queue.Extend(ctx, task.ID)
+ if err != nil {
+ slog.ErrorContext(ctx, "Failed to extend lock", "error", err)
+ }
+ }()
+ case err = <-ch:
+ // scrapeCookies has completed.
+ if err != nil {
+ slog.ErrorContext(ctx, "Failed to scrape cookies", "error", err)
+ _, err = queue.Return(ctx, task.ID, err)
+ if err != nil {
+ slog.ErrorContext(ctx, "Failed to return task", "error", err)
+ return
+ }
+ } else {
+ err = queue.Complete(ctx, task.ID, nil)
+ if err != nil {
+ slog.ErrorContext(ctx, "Failed to complete task", "error", err)
+ return
+ }
+ slog.DebugContext(ctx, "Authenticated user", "user", task.Data.UserSubject)
+ }
+ return
+ }
+ }
+}
+
+func scrapeCookies(
+ ctx context.Context,
+ client *ibd.Client,
+ users database.UserStore,
+ store database.CookieStore,
+ user string,
+) error {
+ ctx, cancel := context.WithTimeout(ctx, lockTimeout)
+ defer cancel()
+
+ // Check if the user has valid cookies
+ done, err := hasValidCookies(ctx, store, user)
+ if err != nil {
+ return fmt.Errorf("failed to check cookies: %w", err)
+ }
+ if done {
+ return nil
+ }
+
+ // Health check degraded cookies
+ done, err = healthCheckDegradedCookies(ctx, client, store, user)
+ if err != nil {
+ return fmt.Errorf("failed to health check cookies: %w", err)
+ }
+ if done {
+ return nil
+ }
+
+ // No cookies are valid, so scrape new cookies
+ return scrapeNewCookies(ctx, client, users, store, user)
+}
+
+func hasValidCookies(ctx context.Context, store database.CookieStore, user string) (bool, error) {
+ // Check if the user has non-degraded cookies
+ cookies, err := store.GetCookies(ctx, user, false)
+ if err != nil {
+ return false, fmt.Errorf("failed to get non-degraded cookies: %w", err)
+ }
+
+ // If the user has non-degraded cookies, return true
+ if cookies != nil && len(cookies) > 0 {
+ return true, nil
+ }
+ return false, nil
+}
+
+func healthCheckDegradedCookies(
+ ctx context.Context,
+ client *ibd.Client,
+ store database.CookieStore,
+ user string,
+) (bool, error) {
+ // Check if the user has degraded cookies
+ cookies, err := store.GetCookies(ctx, user, true)
+ if err != nil {
+ return false, fmt.Errorf("failed to get degraded cookies: %w", err)
+ }
+
+ valid := false
+ for _, cookie := range cookies {
+ slog.DebugContext(ctx, "Health checking cookie", "cookie", cookie.ID)
+
+ // Health check the cookie
+ up, err := client.UserInfo(ctx, cookie.ToHTTPCookie())
+ if err != nil {
+ slog.ErrorContext(ctx, "Failed to health check cookie", "error", err)
+ continue
+ }
+
+ if up.Status != ibd.UserStatusSubscriber {
+ continue
+ }
+
+ // Cookie is valid
+ valid = true
+
+ // Update the cookie
+ err = store.RepairCookie(ctx, cookie.ID)
+ if err != nil {
+ slog.ErrorContext(ctx, "Failed to repair cookie", "error", err)
+ }
+ }
+
+ return valid, nil
+}
+
+func scrapeNewCookies(
+ ctx context.Context,
+ client *ibd.Client,
+ users database.UserStore,
+ store database.CookieStore,
+ user string,
+) error {
+ // Get the user's credentials
+ username, password, err := users.GetIBDCreds(ctx, user)
+ if err != nil {
+ return fmt.Errorf("failed to get IBD credentials: %w", err)
+ }
+
+ // Scrape the user's cookies
+ cookie, err := client.Authenticate(ctx, username, password)
+ if err != nil {
+ return fmt.Errorf("failed to authenticate user: %w", err)
+ }
+
+ // Store the cookie
+ err = store.AddCookie(ctx, user, cookie)
+ if err != nil {
+ return fmt.Errorf("failed to store cookie: %w", err)
+ }
+
+ return nil
+}