aboutsummaryrefslogtreecommitdiff
path: root/backend/cmd
diff options
context:
space:
mode:
authorGravatar Anshul Gupta <ansg191@anshulg.com> 2024-08-11 13:15:50 -0700
committerGravatar Anshul Gupta <ansg191@anshulg.com> 2024-08-11 13:15:50 -0700
commit6a3c21fb0b1c126849f2bbff494403bbe901448e (patch)
tree5d7805524357c2c8a9819c39d2051a4e3633a1d5 /backend/cmd
parent29c6040a51616e9e4cf6c70ee16391b2a3b238c9 (diff)
parentf34b92ded11b07f78575ac62c260a380c468e5ea (diff)
downloadibd-trader-6a3c21fb0b1c126849f2bbff494403bbe901448e.tar.gz
ibd-trader-6a3c21fb0b1c126849f2bbff494403bbe901448e.tar.zst
ibd-trader-6a3c21fb0b1c126849f2bbff494403bbe901448e.zip
Merge remote-tracking branch 'backend/main'
Diffstat (limited to 'backend/cmd')
-rw-r--r--backend/cmd/main.go187
1 files changed, 187 insertions, 0 deletions
diff --git a/backend/cmd/main.go b/backend/cmd/main.go
new file mode 100644
index 0000000..2a34780
--- /dev/null
+++ b/backend/cmd/main.go
@@ -0,0 +1,187 @@
+package main
+
+import (
+ "context"
+ "fmt"
+ "log"
+ "log/slog"
+ "net/http"
+ "net/url"
+ "os"
+ "os/signal"
+ "time"
+
+ "github.com/ansg191/ibd-trader-backend/internal/analyzer/openai"
+ auth2 "github.com/ansg191/ibd-trader-backend/internal/auth"
+ "github.com/ansg191/ibd-trader-backend/internal/config"
+ "github.com/ansg191/ibd-trader-backend/internal/database"
+ "github.com/ansg191/ibd-trader-backend/internal/ibd"
+ "github.com/ansg191/ibd-trader-backend/internal/ibd/transport"
+ "github.com/ansg191/ibd-trader-backend/internal/ibd/transport/scrapfly"
+ "github.com/ansg191/ibd-trader-backend/internal/keys"
+ "github.com/ansg191/ibd-trader-backend/internal/leader/election"
+ "github.com/ansg191/ibd-trader-backend/internal/leader/manager"
+ "github.com/ansg191/ibd-trader-backend/internal/server"
+ "github.com/ansg191/ibd-trader-backend/internal/worker"
+
+ "github.com/lmittmann/tint"
+ "github.com/redis/go-redis/v9"
+)
+
+func main() {
+ // Load the config
+ cfg, err := config.New()
+ if err != nil {
+ log.Fatal("Unable to load config: ", err)
+ }
+
+ // Setup slog
+ var level slog.Level
+ if err = level.UnmarshalText([]byte(cfg.Log.Level)); err != nil {
+ log.Fatal("Unable to parse log level: ", err)
+ }
+ var logger *slog.Logger
+ opts := &tint.Options{
+ AddSource: cfg.Log.AddSource,
+ Level: level,
+ NoColor: !cfg.Log.Color,
+ }
+ logger = slog.New(tint.NewHandler(os.Stdout, opts))
+ slog.SetDefault(logger)
+
+ logger.Info(
+ "Starting IBD Trader...",
+ "logger.level", level,
+ )
+
+ // Create kms
+ kms, err := createKMS()
+ if err != nil {
+ log.Fatal("Unable to create KMS: ", err)
+ }
+
+ // Connect to the database
+ db, err := connectDB(logger, cfg, kms)
+ defer func(db database.Database) {
+ _ = db.Close()
+ }(db)
+ if err != nil {
+ log.Fatal("Unable to connect to database: ", err)
+ }
+
+ // Connect to redis
+ redisClient := redis.NewClient(&redis.Options{
+ Addr: cfg.Redis.Addr,
+ Password: cfg.Redis.Password,
+ })
+ defer redisClient.Close()
+
+ // Setup auth
+ auth, err := auth2.New(cfg)
+ if err != nil {
+ log.Fatal("Unable to setup auth: ", err)
+ }
+ _ = auth
+
+ // Setup IBD client
+ client, err := setupIBDClient(cfg, db, kms)
+ if err != nil {
+ log.Fatal("Unable to setup IBD client: ", err)
+ }
+
+ // Setup analyzer
+ analyzer := openai.NewAnalyzer(openai.WithDefaultConfig(cfg.Analyzer.OpenAI.APIKey))
+ _ = analyzer
+
+ // Setup context w/ signal handling
+ ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
+ defer cancel()
+
+ //// Start the server
+ //go func() {
+ // if err := server.StartServer(ctx, cfg, logger, db, auth, client, redisClient); err != nil {
+ // log.Fatal("Unable to start server: ", err)
+ // }
+ // // Cancel the context when the server stops
+ // cancel()
+ //}()
+
+ // Start the gRPC server
+ go func() {
+ s, err := server.New(ctx, cfg.Server.Port, db, redisClient, client, kms, cfg.KMS.GCP.String())
+ if err != nil {
+ log.Fatal("Unable to create gRPC server: ", err)
+ }
+ if err := s.Serve(ctx); err != nil {
+ slog.ErrorContext(ctx, "Unable to start gRPC server", "error", err)
+ }
+ // Cancel the context when the server stops
+ cancel()
+ }()
+
+ // Start the worker
+ go func() {
+ err := worker.StartWorker(
+ ctx,
+ client,
+ redisClient,
+ db,
+ kms,
+ analyzer,
+ )
+ if err != nil {
+ log.Fatal("Unable to start worker: ", err)
+ }
+ // Cancel the context when the worker stops
+ cancel()
+ }()
+
+ // Start leader election
+ election.RunOrDie(
+ ctx,
+ redisClient,
+ func(ctx context.Context) {
+ m, err := manager.New(ctx, cfg, redisClient, db, client)
+ if err != nil {
+ logger.Error("Unable to create manager", "error", err)
+ return
+ }
+ if err = m.Run(ctx); err != nil {
+ logger.Error("Manager exited with error", "error", err)
+ }
+ },
+ )
+}
+
+func setupIBDClient(cfg *config.Config, db database.Database, kms keys.KeyManagementService) (*ibd.Client, error) {
+ pUrl, err := url.Parse(cfg.IBD.ProxyURL)
+ if err != nil {
+ return nil, fmt.Errorf("unable to parse proxy URL: %w", err)
+ }
+ t := http.DefaultTransport.(*http.Transport).Clone()
+ t.Proxy = http.ProxyURL(pUrl)
+ transports := []transport.Transport{
+ transport.NewStandardTransport(&http.Client{Transport: t}), // Default proxied transport
+ scrapfly.New(http.DefaultClient, cfg.IBD.APIKey), // Scrapfly transport
+ }
+ client := ibd.NewClient(db, kms, transports...)
+ return client, nil
+}
+
+func createKMS() (keys.KeyManagementService, error) {
+ ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+ defer cancel()
+ return keys.NewGoogleKMS(ctx)
+}
+
+func connectDB(logger *slog.Logger, cfg *config.Config, kms keys.KeyManagementService) (database.Database, error) {
+ ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+ defer cancel()
+
+ db, err := database.New(ctx, logger, cfg.DB.URL, kms, cfg.KMS.GCP.String())
+ if err != nil {
+ return nil, err
+ }
+
+ return db, nil
+}