diff options
author | 2024-08-05 18:55:10 -0700 | |
---|---|---|
committer | 2024-08-05 18:55:19 -0700 | |
commit | b96fcd1a54a46a95f98467b49a051564bc21c23c (patch) | |
tree | 93caeeb05f8d6310e241095608ea2428c749b18c /backend/internal/worker/auth | |
download | ibd-trader-b96fcd1a54a46a95f98467b49a051564bc21c23c.tar.gz ibd-trader-b96fcd1a54a46a95f98467b49a051564bc21c23c.tar.zst ibd-trader-b96fcd1a54a46a95f98467b49a051564bc21c23c.zip |
Initial Commit
Diffstat (limited to 'backend/internal/worker/auth')
-rw-r--r-- | backend/internal/worker/auth/auth.go | 228 |
1 files changed, 228 insertions, 0 deletions
diff --git a/backend/internal/worker/auth/auth.go b/backend/internal/worker/auth/auth.go new file mode 100644 index 0000000..e1c6661 --- /dev/null +++ b/backend/internal/worker/auth/auth.go @@ -0,0 +1,228 @@ +package auth + +import ( + "context" + "fmt" + "log/slog" + "time" + + "ibd-trader/internal/database" + "ibd-trader/internal/ibd" + "ibd-trader/internal/leader/manager/ibd/auth" + "ibd-trader/internal/redis/taskqueue" + + "github.com/redis/go-redis/v9" +) + +const ( + lockTimeout = 1 * time.Minute + dequeueTimeout = 5 * time.Second +) + +func RunAuthScraper( + ctx context.Context, + client *ibd.Client, + redis *redis.Client, + users database.UserStore, + cookies database.CookieStore, + name string, +) error { + queue, err := taskqueue.New( + ctx, + redis, + auth.Queue, + name, + taskqueue.WithEncoding[auth.TaskInfo](auth.QueueEncoding), + ) + if err != nil { + return err + } + + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + waitForTask(ctx, queue, client, users, cookies) + } + } +} + +func waitForTask( + ctx context.Context, + queue taskqueue.TaskQueue[auth.TaskInfo], + client *ibd.Client, + users database.UserStore, + cookies database.CookieStore, +) { + task, err := queue.Dequeue(ctx, lockTimeout, dequeueTimeout) + if err != nil { + slog.ErrorContext(ctx, "Failed to dequeue task", "error", err) + return + } + if task == nil { + // No task available. + return + } + slog.DebugContext(ctx, "Picked up auth task", "task", task.ID, "user", task.Data.UserSubject) + + ch := make(chan error) + defer close(ch) + go func() { + ch <- scrapeCookies(ctx, client, users, cookies, task.Data.UserSubject) + }() + + ticker := time.NewTicker(lockTimeout / 5) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + // The context was canceled. Return early. + return + case <-ticker.C: + // Extend the lock periodically. + func() { + ctx, cancel := context.WithTimeout(ctx, lockTimeout/5) + defer cancel() + + err := queue.Extend(ctx, task.ID) + if err != nil { + slog.ErrorContext(ctx, "Failed to extend lock", "error", err) + } + }() + case err = <-ch: + // scrapeCookies has completed. + if err != nil { + slog.ErrorContext(ctx, "Failed to scrape cookies", "error", err) + _, err = queue.Return(ctx, task.ID, err) + if err != nil { + slog.ErrorContext(ctx, "Failed to return task", "error", err) + return + } + } else { + err = queue.Complete(ctx, task.ID, nil) + if err != nil { + slog.ErrorContext(ctx, "Failed to complete task", "error", err) + return + } + slog.DebugContext(ctx, "Authenticated user", "user", task.Data.UserSubject) + } + return + } + } +} + +func scrapeCookies( + ctx context.Context, + client *ibd.Client, + users database.UserStore, + store database.CookieStore, + user string, +) error { + ctx, cancel := context.WithTimeout(ctx, lockTimeout) + defer cancel() + + // Check if the user has valid cookies + done, err := hasValidCookies(ctx, store, user) + if err != nil { + return fmt.Errorf("failed to check cookies: %w", err) + } + if done { + return nil + } + + // Health check degraded cookies + done, err = healthCheckDegradedCookies(ctx, client, store, user) + if err != nil { + return fmt.Errorf("failed to health check cookies: %w", err) + } + if done { + return nil + } + + // No cookies are valid, so scrape new cookies + return scrapeNewCookies(ctx, client, users, store, user) +} + +func hasValidCookies(ctx context.Context, store database.CookieStore, user string) (bool, error) { + // Check if the user has non-degraded cookies + cookies, err := store.GetCookies(ctx, user, false) + if err != nil { + return false, fmt.Errorf("failed to get non-degraded cookies: %w", err) + } + + // If the user has non-degraded cookies, return true + if cookies != nil && len(cookies) > 0 { + return true, nil + } + return false, nil +} + +func healthCheckDegradedCookies( + ctx context.Context, + client *ibd.Client, + store database.CookieStore, + user string, +) (bool, error) { + // Check if the user has degraded cookies + cookies, err := store.GetCookies(ctx, user, true) + if err != nil { + return false, fmt.Errorf("failed to get degraded cookies: %w", err) + } + + valid := false + for _, cookie := range cookies { + slog.DebugContext(ctx, "Health checking cookie", "cookie", cookie.ID) + + // Health check the cookie + up, err := client.UserInfo(ctx, cookie.ToHTTPCookie()) + if err != nil { + slog.ErrorContext(ctx, "Failed to health check cookie", "error", err) + continue + } + + if up.Status != ibd.UserStatusSubscriber { + continue + } + + // Cookie is valid + valid = true + + // Update the cookie + err = store.RepairCookie(ctx, cookie.ID) + if err != nil { + slog.ErrorContext(ctx, "Failed to repair cookie", "error", err) + } + } + + return valid, nil +} + +func scrapeNewCookies( + ctx context.Context, + client *ibd.Client, + users database.UserStore, + store database.CookieStore, + user string, +) error { + // Get the user's credentials + username, password, err := users.GetIBDCreds(ctx, user) + if err != nil { + return fmt.Errorf("failed to get IBD credentials: %w", err) + } + + // Scrape the user's cookies + cookie, err := client.Authenticate(ctx, username, password) + if err != nil { + return fmt.Errorf("failed to authenticate user: %w", err) + } + + // Store the cookie + err = store.AddCookie(ctx, user, cookie) + if err != nil { + return fmt.Errorf("failed to store cookie: %w", err) + } + + return nil +} |