package auth import ( "context" "database/sql" "errors" "fmt" "log/slog" "time" "github.com/ansg191/ibd-trader-backend/internal/database" "github.com/ansg191/ibd-trader-backend/internal/ibd" "github.com/ansg191/ibd-trader-backend/internal/keys" "github.com/ansg191/ibd-trader-backend/internal/leader/manager/ibd/auth" "github.com/ansg191/ibd-trader-backend/internal/redis/taskqueue" "github.com/redis/go-redis/v9" ) const ( lockTimeout = 1 * time.Minute dequeueTimeout = 5 * time.Second ) func RunAuthScraper( ctx context.Context, client *ibd.Client, redis *redis.Client, db database.TransactionExecutor, kms keys.KeyManagementService, name string, ) error { queue, err := taskqueue.New( ctx, redis, auth.Queue, name, taskqueue.WithEncoding[auth.TaskInfo](auth.QueueEncoding), ) if err != nil { return err } for { select { case <-ctx.Done(): return ctx.Err() default: waitForTask(ctx, queue, client, db, kms) } } } func waitForTask( ctx context.Context, queue taskqueue.TaskQueue[auth.TaskInfo], client *ibd.Client, db database.TransactionExecutor, kms keys.KeyManagementService, ) { task, err := queue.Dequeue(ctx, lockTimeout, dequeueTimeout) if err != nil { slog.ErrorContext(ctx, "Failed to dequeue task", "error", err) return } if task == nil { // No task available. return } slog.DebugContext(ctx, "Picked up auth task", "task", task.ID, "user", task.Data.UserSubject) ch := make(chan error) defer close(ch) go func() { ch <- scrapeCookies(ctx, client, db, kms, task.Data.UserSubject) }() ticker := time.NewTicker(lockTimeout / 5) defer ticker.Stop() for { select { case <-ctx.Done(): // The context was canceled. Return early. return case <-ticker.C: // Extend the lock periodically. func() { ctx, cancel := context.WithTimeout(ctx, lockTimeout/5) defer cancel() err := queue.Extend(ctx, task.ID) if err != nil { slog.ErrorContext(ctx, "Failed to extend lock", "error", err) } }() case err = <-ch: // scrapeCookies has completed. if err != nil { slog.ErrorContext(ctx, "Failed to scrape cookies", "error", err) _, err = queue.Return(ctx, task.ID, err) if err != nil { slog.ErrorContext(ctx, "Failed to return task", "error", err) return } } else { err = queue.Complete(ctx, task.ID, "") if err != nil { slog.ErrorContext(ctx, "Failed to complete task", "error", err) return } slog.DebugContext(ctx, "Authenticated user", "user", task.Data.UserSubject) } return } } } func scrapeCookies( ctx context.Context, client *ibd.Client, db database.TransactionExecutor, kms keys.KeyManagementService, user string, ) error { ctx, cancel := context.WithTimeout(ctx, lockTimeout) defer cancel() // Check if the user has valid cookies done, err := hasValidCookies(ctx, db, user) if err != nil { return fmt.Errorf("failed to check cookies: %w", err) } if done { return nil } // Health check degraded cookies done, err = healthCheckDegradedCookies(ctx, client, db, kms, user) if err != nil { return fmt.Errorf("failed to health check cookies: %w", err) } if done { return nil } // No cookies are valid, so scrape new cookies return scrapeNewCookies(ctx, client, db, kms, user) } func hasValidCookies(ctx context.Context, db database.Executor, user string) (bool, error) { // Check if the user has non-degraded cookies row := db.QueryRowContext(ctx, ` SELECT 1 FROM ibd_tokens WHERE user_subject = $1 AND expires_at > NOW() AND degraded = FALSE;`, user) var exists bool err := row.Scan(&exists) if errors.Is(err, sql.ErrNoRows) { return false, nil } if err != nil { return false, fmt.Errorf("failed to get non-degraded cookies: %w", err) } return true, nil } func healthCheckDegradedCookies( ctx context.Context, client *ibd.Client, db database.Executor, kms keys.KeyManagementService, user string, ) (bool, error) { // Check if the user has degraded cookies cookies, err := database.GetCookies(ctx, db, kms, user, true) if err != nil { return false, fmt.Errorf("failed to get degraded cookies: %w", err) } valid := false for _, cookie := range cookies { slog.DebugContext(ctx, "Health checking cookie", "cookie", cookie.ID) // Health check the cookie up, err := client.UserInfo(ctx, cookie.ToHTTPCookie()) if err != nil { slog.ErrorContext(ctx, "Failed to health check cookie", "error", err) continue } if up.Status != ibd.UserStatusSubscriber { continue } // Cookie is valid valid = true // Update the cookie err = database.RepairCookie(ctx, db, cookie.ID) if err != nil { slog.ErrorContext(ctx, "Failed to repair cookie", "error", err) } } return valid, nil } func scrapeNewCookies( ctx context.Context, client *ibd.Client, db database.TransactionExecutor, kms keys.KeyManagementService, user string, ) error { // Get the user's credentials username, password, err := database.GetIBDCreds(ctx, db, kms, user) if err != nil { return fmt.Errorf("failed to get IBD credentials: %w", err) } // Scrape the user's cookies cookie, err := client.Authenticate(ctx, username, password) if err != nil { return fmt.Errorf("failed to authenticate user: %w", err) } // Store the cookie err = database.AddCookie(ctx, db, kms, user, cookie) if err != nil { return fmt.Errorf("failed to store cookie: %w", err) } return nil }