diff options
Diffstat (limited to 'backend/internal/leader/manager/manager.go')
-rw-r--r-- | backend/internal/leader/manager/manager.go | 90 |
1 files changed, 90 insertions, 0 deletions
diff --git a/backend/internal/leader/manager/manager.go b/backend/internal/leader/manager/manager.go new file mode 100644 index 0000000..61e27e0 --- /dev/null +++ b/backend/internal/leader/manager/manager.go @@ -0,0 +1,90 @@ +package manager + +import ( + "context" + "fmt" + "log/slog" + "sync" + + "github.com/ansg191/ibd-trader-backend/internal/config" + "github.com/ansg191/ibd-trader-backend/internal/database" + "github.com/ansg191/ibd-trader-backend/internal/ibd" + "github.com/ansg191/ibd-trader-backend/internal/leader/manager/ibd/auth" + ibd2 "github.com/ansg191/ibd-trader-backend/internal/leader/manager/ibd/scrape" + + "github.com/redis/go-redis/v9" + "github.com/robfig/cron/v3" +) + +type Manager struct { + db database.Database + Monitor *WorkerMonitor + Scraper *ibd2.Manager + Auth *auth.Manager +} + +func New( + ctx context.Context, + cfg *config.Config, + client *redis.Client, + db database.Database, + ibd *ibd.Client, +) (*Manager, error) { + scraperSchedule, err := cron.ParseStandard(cfg.IBD.Schedules.IBD50) + if err != nil { + return nil, fmt.Errorf("unable to parse IBD50 schedule: %w", err) + } + scraper, err := ibd2.New(ctx, ibd, db, client, scraperSchedule) + if err != nil { + return nil, err + } + + authSchedule, err := cron.ParseStandard(cfg.IBD.Schedules.Auth) + if err != nil { + return nil, fmt.Errorf("unable to parse Auth schedule: %w", err) + } + authManager, err := auth.New(ctx, db, client, authSchedule) + if err != nil { + return nil, err + } + + return &Manager{ + db: db, + Monitor: NewWorkerMonitor(client), + Scraper: scraper, + Auth: authManager, + }, nil +} + +func (m *Manager) Run(ctx context.Context) error { + if err := m.db.Migrate(ctx); err != nil { + slog.ErrorContext(ctx, "Unable to migrate database", "error", err) + return err + } + + var wg sync.WaitGroup + wg.Add(4) + + go func() { + defer wg.Done() + m.db.Maintenance(ctx) + }() + + go func() { + defer wg.Done() + m.Monitor.Start(ctx) + }() + + go func() { + defer wg.Done() + m.Scraper.Run(ctx) + }() + + go func() { + defer wg.Done() + m.Auth.Run(ctx) + }() + + wg.Wait() + return ctx.Err() +} |