diff options
Diffstat (limited to 'backend/internal/server2')
-rw-r--r-- | backend/internal/server2/idb/stock/v1/stock.go | 63 | ||||
-rw-r--r-- | backend/internal/server2/idb/user/v1/user.go | 94 | ||||
-rw-r--r-- | backend/internal/server2/operations.go | 130 | ||||
-rw-r--r-- | backend/internal/server2/server.go | 71 |
4 files changed, 358 insertions, 0 deletions
diff --git a/backend/internal/server2/idb/stock/v1/stock.go b/backend/internal/server2/idb/stock/v1/stock.go new file mode 100644 index 0000000..3a94c82 --- /dev/null +++ b/backend/internal/server2/idb/stock/v1/stock.go @@ -0,0 +1,63 @@ +package stock + +import ( + "context" + "fmt" + "log/slog" + + pb "ibd-trader/api/gen/idb/stock/v1" + "ibd-trader/internal/database" + "ibd-trader/internal/leader/manager/ibd/scrape" + "ibd-trader/internal/redis/taskqueue" + + "cloud.google.com/go/longrunning/autogen/longrunningpb" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/timestamppb" +) + +const ScrapeOperationPrefix = "scrape" + +type Server struct { + pb.UnimplementedStockServiceServer + + db database.StockStore + queue taskqueue.TaskQueue[scrape.TaskInfo] +} + +func New(db database.StockStore, queue taskqueue.TaskQueue[scrape.TaskInfo]) *Server { + return &Server{db: db, queue: queue} +} + +func (s *Server) CreateStock(ctx context.Context, request *pb.CreateStockRequest) (*longrunningpb.Operation, error) { + task, err := s.queue.Enqueue(ctx, scrape.TaskInfo{Symbol: request.Symbol}) + if err != nil { + slog.ErrorContext(ctx, "failed to enqueue task", "err", err) + return nil, status.New(codes.Internal, "failed to enqueue task").Err() + } + op := &longrunningpb.Operation{ + Name: fmt.Sprintf("%s/%s", ScrapeOperationPrefix, task.ID.String()), + Metadata: new(anypb.Any), + Done: false, + Result: nil, + } + err = op.Metadata.MarshalFrom(&pb.StockScrapeOperationMetadata{ + Symbol: request.Symbol, + StartTime: timestamppb.New(task.ID.Timestamp()), + }) + if err != nil { + slog.ErrorContext(ctx, "failed to marshal metadata", "err", err) + return nil, status.New(codes.Internal, "failed to marshal metadata").Err() + } + return op, nil +} + +func (s *Server) GetStock(ctx context.Context, request *pb.GetStockRequest) (*pb.GetStockResponse, error) { + +} + +func (s *Server) ListStocks(ctx context.Context, request *pb.ListStocksRequest) (*pb.ListStocksResponse, error) { + //TODO implement me + panic("implement me") +} diff --git a/backend/internal/server2/idb/user/v1/user.go b/backend/internal/server2/idb/user/v1/user.go new file mode 100644 index 0000000..1866944 --- /dev/null +++ b/backend/internal/server2/idb/user/v1/user.go @@ -0,0 +1,94 @@ +package user + +import ( + "context" + "errors" + + pb "ibd-trader/api/gen/idb/user/v1" + "ibd-trader/internal/database" + + "github.com/mennanov/fmutils" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" +) + +type Server struct { + pb.UnimplementedUserServiceServer + + db database.UserStore +} + +func New(db database.UserStore) *Server { + return &Server{db: db} +} + +func (u *Server) CreateUser(ctx context.Context, request *pb.CreateUserRequest) (*pb.CreateUserResponse, error) { + err := u.db.AddUser(ctx, request.Subject) + if err != nil { + return nil, status.Errorf(codes.Internal, "unable to create user: %v", err) + } + + user, err := u.db.GetUser(ctx, request.Subject) + if err != nil { + return nil, status.Errorf(codes.Internal, "unable to get user: %v", err) + } + + return &pb.CreateUserResponse{ + User: &pb.User{ + Subject: user.Subject, + IbdUsername: user.IBDUsername, + IbdPassword: nil, + }, + }, nil +} + +func (u *Server) GetUser(ctx context.Context, request *pb.GetUserRequest) (*pb.GetUserResponse, error) { + user, err := u.db.GetUser(ctx, request.Subject) + if errors.Is(err, database.ErrUserNotFound) { + return nil, status.New(codes.NotFound, "user not found").Err() + } + if err != nil { + return nil, status.Errorf(codes.Internal, "unable to get user: %v", err) + } + + return &pb.GetUserResponse{ + User: &pb.User{ + Subject: user.Subject, + IbdUsername: user.IBDUsername, + IbdPassword: nil, + }, + }, nil +} + +func (u *Server) UpdateUser(ctx context.Context, request *pb.UpdateUserRequest) (*pb.UpdateUserResponse, error) { + request.UpdateMask.Normalize() + if !request.UpdateMask.IsValid(request.User) { + return nil, status.Errorf(codes.InvalidArgument, "invalid update mask") + } + + existingUserRes, err := u.GetUser(ctx, &pb.GetUserRequest{Subject: request.User.Subject}) + if err != nil { + return nil, err + } + existingUser := existingUserRes.User + + newUser := proto.Clone(existingUser).(*pb.User) + fmutils.Overwrite(request.User, newUser, request.UpdateMask.Paths) + + // if IDB creds are both set and are different, update them + if (newUser.IbdPassword != nil && newUser.IbdUsername != nil) && + (newUser.IbdPassword != existingUser.IbdPassword || + newUser.IbdUsername != existingUser.IbdUsername) { + // Update IBD creds + err = u.db.AddIBDCreds(ctx, newUser.Subject, *newUser.IbdUsername, *newUser.IbdPassword) + if err != nil { + return nil, status.Errorf(codes.Internal, "unable to update user: %v", err) + } + } + + newUser.IbdPassword = nil + return &pb.UpdateUserResponse{ + User: newUser, + }, nil +} diff --git a/backend/internal/server2/operations.go b/backend/internal/server2/operations.go new file mode 100644 index 0000000..c632cd1 --- /dev/null +++ b/backend/internal/server2/operations.go @@ -0,0 +1,130 @@ +package server2 + +import ( + "context" + "errors" + "fmt" + "log/slog" + "strings" + + spb "ibd-trader/api/gen/idb/stock/v1" + "ibd-trader/internal/leader/manager/ibd/scrape" + "ibd-trader/internal/redis/taskqueue" + "ibd-trader/internal/server2/idb/stock/v1" + + "cloud.google.com/go/longrunning/autogen/longrunningpb" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/timestamppb" +) + +type operationServer struct { + longrunningpb.UnimplementedOperationsServer + + scrape taskqueue.TaskQueue[scrape.TaskInfo] +} + +func newOperationServer(scrapeQueue taskqueue.TaskQueue[scrape.TaskInfo]) *operationServer { + return &operationServer{scrape: scrapeQueue} +} + +func (o *operationServer) ListOperations( + ctx context.Context, + req *longrunningpb.ListOperationsRequest, +) (*longrunningpb.ListOperationsResponse, error) { + var end taskqueue.TaskID + if req.PageToken != "" { + var err error + end, err = taskqueue.ParseTaskID(req.PageToken) + if err != nil { + return nil, status.New(codes.InvalidArgument, err.Error()).Err() + } + } else { + end = taskqueue.TaskID{} + } + + switch req.Name { + case stock.ScrapeOperationPrefix: + tasks, err := o.scrape.List(ctx, taskqueue.TaskID{}, end, int64(req.PageSize)) + if err != nil { + return nil, status.New(codes.Internal, "unable to list IDs").Err() + } + + ops := make([]*longrunningpb.Operation, len(tasks)) + for i, task := range tasks { + ops[i] = &longrunningpb.Operation{ + Name: fmt.Sprintf("%s/%s", stock.ScrapeOperationPrefix, task.ID.String()), + Metadata: new(anypb.Any), + Done: task.Done, + Result: nil, + } + err = ops[i].Metadata.MarshalFrom(&spb.StockScrapeOperationMetadata{ + Symbol: task.Data.Symbol, + StartTime: timestamppb.New(task.ID.Timestamp()), + }) + if err != nil { + return nil, status.New(codes.Internal, "unable to marshal metadata").Err() + } + + if task.Done && task.Error != "" { + s := status.New(codes.Unknown, task.Error) + ops[i].Result = &longrunningpb.Operation_Error{Error: s.Proto()} + } + } + + var nextPageToken string + if len(tasks) == int(req.PageSize) { + nextPageToken = tasks[len(tasks)-1].ID.String() + } else { + nextPageToken = "" + } + + return &longrunningpb.ListOperationsResponse{ + Operations: ops, + NextPageToken: nextPageToken, + }, nil + default: + return nil, status.New(codes.NotFound, "unknown operation type").Err() + } +} + +func (o *operationServer) GetOperation(ctx context.Context, req *longrunningpb.GetOperationRequest) (*longrunningpb.Operation, error) { + prefix, id, ok := strings.Cut(req.Name, "/") + if !ok || prefix == "" || id == "" { + return nil, status.New(codes.InvalidArgument, "invalid operation name").Err() + } + + taskID, err := taskqueue.ParseTaskID(id) + if err != nil { + return nil, status.New(codes.InvalidArgument, err.Error()).Err() + } + + switch prefix { + case stock.ScrapeOperationPrefix: + task, err := o.scrape.Data(ctx, taskID) + if errors.Is(err, taskqueue.ErrTaskNotFound) { + return nil, status.New(codes.NotFound, "operation not found").Err() + } + if err != nil { + slog.ErrorContext(ctx, "unable to get operation", "error", err) + return nil, status.New(codes.Internal, "unable to get operation").Err() + } + op := &longrunningpb.Operation{ + Name: req.Name, + Metadata: new(anypb.Any), + Done: task.Done, + Result: nil, + } + err = op.Metadata.MarshalFrom(&spb.StockScrapeOperationMetadata{ + Symbol: task.Data.Symbol, + StartTime: timestamppb.New(task.ID.Timestamp()), + }) + if err != nil { + return nil, status.New(codes.Internal, "unable to marshal metadata").Err() + } + return op, nil + default: + return nil, status.New(codes.NotFound, "unknown operation type").Err() + } +} diff --git a/backend/internal/server2/server.go b/backend/internal/server2/server.go new file mode 100644 index 0000000..4731bdd --- /dev/null +++ b/backend/internal/server2/server.go @@ -0,0 +1,71 @@ +package server2 + +import ( + "context" + "fmt" + "log/slog" + "net" + + spb "ibd-trader/api/gen/idb/stock/v1" + upb "ibd-trader/api/gen/idb/user/v1" + "ibd-trader/internal/database" + "ibd-trader/internal/leader/manager/ibd/scrape" + "ibd-trader/internal/redis/taskqueue" + "ibd-trader/internal/server2/idb/stock/v1" + "ibd-trader/internal/server2/idb/user/v1" + + "cloud.google.com/go/longrunning/autogen/longrunningpb" + "github.com/redis/go-redis/v9" + "google.golang.org/grpc" + "google.golang.org/grpc/reflection" +) + +type Server struct { + s *grpc.Server + port uint16 +} + +func New( + ctx context.Context, + port uint16, + db database.Database, + rClient *redis.Client, +) (*Server, error) { + scrapeQueue, err := taskqueue.New( + ctx, + rClient, + scrape.Queue, + "grpc-server", + taskqueue.WithEncoding[scrape.TaskInfo](scrape.QueueEncoding)) + if err != nil { + return nil, err + } + + s := grpc.NewServer() + upb.RegisterUserServiceServer(s, user.New(db)) + spb.RegisterStockServiceServer(s, stock.New(db, scrapeQueue)) + longrunningpb.RegisterOperationsServer(s, newOperationServer(scrapeQueue)) + reflection.Register(s) + return &Server{s, port}, nil +} + +func (s *Server) Serve(ctx context.Context) error { + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", s.port)) + if err != nil { + return err + } + + // Graceful shutdown + go func() { + <-ctx.Done() + slog.ErrorContext(ctx, + "Shutting down server", + "err", ctx.Err(), + "cause", context.Cause(ctx), + ) + s.s.GracefulStop() + }() + + slog.InfoContext(ctx, "Starting gRPC server", "port", s.port) + return s.s.Serve(lis) +} |