aboutsummaryrefslogtreecommitdiff
path: root/backend/internal/leader/manager/manager.go
diff options
context:
space:
mode:
Diffstat (limited to 'backend/internal/leader/manager/manager.go')
-rw-r--r--backend/internal/leader/manager/manager.go90
1 files changed, 90 insertions, 0 deletions
diff --git a/backend/internal/leader/manager/manager.go b/backend/internal/leader/manager/manager.go
new file mode 100644
index 0000000..61e27e0
--- /dev/null
+++ b/backend/internal/leader/manager/manager.go
@@ -0,0 +1,90 @@
+package manager
+
+import (
+ "context"
+ "fmt"
+ "log/slog"
+ "sync"
+
+ "github.com/ansg191/ibd-trader-backend/internal/config"
+ "github.com/ansg191/ibd-trader-backend/internal/database"
+ "github.com/ansg191/ibd-trader-backend/internal/ibd"
+ "github.com/ansg191/ibd-trader-backend/internal/leader/manager/ibd/auth"
+ ibd2 "github.com/ansg191/ibd-trader-backend/internal/leader/manager/ibd/scrape"
+
+ "github.com/redis/go-redis/v9"
+ "github.com/robfig/cron/v3"
+)
+
+type Manager struct {
+ db database.Database
+ Monitor *WorkerMonitor
+ Scraper *ibd2.Manager
+ Auth *auth.Manager
+}
+
+func New(
+ ctx context.Context,
+ cfg *config.Config,
+ client *redis.Client,
+ db database.Database,
+ ibd *ibd.Client,
+) (*Manager, error) {
+ scraperSchedule, err := cron.ParseStandard(cfg.IBD.Schedules.IBD50)
+ if err != nil {
+ return nil, fmt.Errorf("unable to parse IBD50 schedule: %w", err)
+ }
+ scraper, err := ibd2.New(ctx, ibd, db, client, scraperSchedule)
+ if err != nil {
+ return nil, err
+ }
+
+ authSchedule, err := cron.ParseStandard(cfg.IBD.Schedules.Auth)
+ if err != nil {
+ return nil, fmt.Errorf("unable to parse Auth schedule: %w", err)
+ }
+ authManager, err := auth.New(ctx, db, client, authSchedule)
+ if err != nil {
+ return nil, err
+ }
+
+ return &Manager{
+ db: db,
+ Monitor: NewWorkerMonitor(client),
+ Scraper: scraper,
+ Auth: authManager,
+ }, nil
+}
+
+func (m *Manager) Run(ctx context.Context) error {
+ if err := m.db.Migrate(ctx); err != nil {
+ slog.ErrorContext(ctx, "Unable to migrate database", "error", err)
+ return err
+ }
+
+ var wg sync.WaitGroup
+ wg.Add(4)
+
+ go func() {
+ defer wg.Done()
+ m.db.Maintenance(ctx)
+ }()
+
+ go func() {
+ defer wg.Done()
+ m.Monitor.Start(ctx)
+ }()
+
+ go func() {
+ defer wg.Done()
+ m.Scraper.Run(ctx)
+ }()
+
+ go func() {
+ defer wg.Done()
+ m.Auth.Run(ctx)
+ }()
+
+ wg.Wait()
+ return ctx.Err()
+}