aboutsummaryrefslogtreecommitdiff
path: root/backend/internal/server2
diff options
context:
space:
mode:
Diffstat (limited to 'backend/internal/server2')
-rw-r--r--backend/internal/server2/idb/stock/v1/stock.go63
-rw-r--r--backend/internal/server2/idb/user/v1/user.go94
-rw-r--r--backend/internal/server2/operations.go130
-rw-r--r--backend/internal/server2/server.go71
4 files changed, 358 insertions, 0 deletions
diff --git a/backend/internal/server2/idb/stock/v1/stock.go b/backend/internal/server2/idb/stock/v1/stock.go
new file mode 100644
index 0000000..3a94c82
--- /dev/null
+++ b/backend/internal/server2/idb/stock/v1/stock.go
@@ -0,0 +1,63 @@
+package stock
+
+import (
+ "context"
+ "fmt"
+ "log/slog"
+
+ pb "ibd-trader/api/gen/idb/stock/v1"
+ "ibd-trader/internal/database"
+ "ibd-trader/internal/leader/manager/ibd/scrape"
+ "ibd-trader/internal/redis/taskqueue"
+
+ "cloud.google.com/go/longrunning/autogen/longrunningpb"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "google.golang.org/protobuf/types/known/anypb"
+ "google.golang.org/protobuf/types/known/timestamppb"
+)
+
+const ScrapeOperationPrefix = "scrape"
+
+type Server struct {
+ pb.UnimplementedStockServiceServer
+
+ db database.StockStore
+ queue taskqueue.TaskQueue[scrape.TaskInfo]
+}
+
+func New(db database.StockStore, queue taskqueue.TaskQueue[scrape.TaskInfo]) *Server {
+ return &Server{db: db, queue: queue}
+}
+
+func (s *Server) CreateStock(ctx context.Context, request *pb.CreateStockRequest) (*longrunningpb.Operation, error) {
+ task, err := s.queue.Enqueue(ctx, scrape.TaskInfo{Symbol: request.Symbol})
+ if err != nil {
+ slog.ErrorContext(ctx, "failed to enqueue task", "err", err)
+ return nil, status.New(codes.Internal, "failed to enqueue task").Err()
+ }
+ op := &longrunningpb.Operation{
+ Name: fmt.Sprintf("%s/%s", ScrapeOperationPrefix, task.ID.String()),
+ Metadata: new(anypb.Any),
+ Done: false,
+ Result: nil,
+ }
+ err = op.Metadata.MarshalFrom(&pb.StockScrapeOperationMetadata{
+ Symbol: request.Symbol,
+ StartTime: timestamppb.New(task.ID.Timestamp()),
+ })
+ if err != nil {
+ slog.ErrorContext(ctx, "failed to marshal metadata", "err", err)
+ return nil, status.New(codes.Internal, "failed to marshal metadata").Err()
+ }
+ return op, nil
+}
+
+func (s *Server) GetStock(ctx context.Context, request *pb.GetStockRequest) (*pb.GetStockResponse, error) {
+
+}
+
+func (s *Server) ListStocks(ctx context.Context, request *pb.ListStocksRequest) (*pb.ListStocksResponse, error) {
+ //TODO implement me
+ panic("implement me")
+}
diff --git a/backend/internal/server2/idb/user/v1/user.go b/backend/internal/server2/idb/user/v1/user.go
new file mode 100644
index 0000000..1866944
--- /dev/null
+++ b/backend/internal/server2/idb/user/v1/user.go
@@ -0,0 +1,94 @@
+package user
+
+import (
+ "context"
+ "errors"
+
+ pb "ibd-trader/api/gen/idb/user/v1"
+ "ibd-trader/internal/database"
+
+ "github.com/mennanov/fmutils"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "google.golang.org/protobuf/proto"
+)
+
+type Server struct {
+ pb.UnimplementedUserServiceServer
+
+ db database.UserStore
+}
+
+func New(db database.UserStore) *Server {
+ return &Server{db: db}
+}
+
+func (u *Server) CreateUser(ctx context.Context, request *pb.CreateUserRequest) (*pb.CreateUserResponse, error) {
+ err := u.db.AddUser(ctx, request.Subject)
+ if err != nil {
+ return nil, status.Errorf(codes.Internal, "unable to create user: %v", err)
+ }
+
+ user, err := u.db.GetUser(ctx, request.Subject)
+ if err != nil {
+ return nil, status.Errorf(codes.Internal, "unable to get user: %v", err)
+ }
+
+ return &pb.CreateUserResponse{
+ User: &pb.User{
+ Subject: user.Subject,
+ IbdUsername: user.IBDUsername,
+ IbdPassword: nil,
+ },
+ }, nil
+}
+
+func (u *Server) GetUser(ctx context.Context, request *pb.GetUserRequest) (*pb.GetUserResponse, error) {
+ user, err := u.db.GetUser(ctx, request.Subject)
+ if errors.Is(err, database.ErrUserNotFound) {
+ return nil, status.New(codes.NotFound, "user not found").Err()
+ }
+ if err != nil {
+ return nil, status.Errorf(codes.Internal, "unable to get user: %v", err)
+ }
+
+ return &pb.GetUserResponse{
+ User: &pb.User{
+ Subject: user.Subject,
+ IbdUsername: user.IBDUsername,
+ IbdPassword: nil,
+ },
+ }, nil
+}
+
+func (u *Server) UpdateUser(ctx context.Context, request *pb.UpdateUserRequest) (*pb.UpdateUserResponse, error) {
+ request.UpdateMask.Normalize()
+ if !request.UpdateMask.IsValid(request.User) {
+ return nil, status.Errorf(codes.InvalidArgument, "invalid update mask")
+ }
+
+ existingUserRes, err := u.GetUser(ctx, &pb.GetUserRequest{Subject: request.User.Subject})
+ if err != nil {
+ return nil, err
+ }
+ existingUser := existingUserRes.User
+
+ newUser := proto.Clone(existingUser).(*pb.User)
+ fmutils.Overwrite(request.User, newUser, request.UpdateMask.Paths)
+
+ // if IDB creds are both set and are different, update them
+ if (newUser.IbdPassword != nil && newUser.IbdUsername != nil) &&
+ (newUser.IbdPassword != existingUser.IbdPassword ||
+ newUser.IbdUsername != existingUser.IbdUsername) {
+ // Update IBD creds
+ err = u.db.AddIBDCreds(ctx, newUser.Subject, *newUser.IbdUsername, *newUser.IbdPassword)
+ if err != nil {
+ return nil, status.Errorf(codes.Internal, "unable to update user: %v", err)
+ }
+ }
+
+ newUser.IbdPassword = nil
+ return &pb.UpdateUserResponse{
+ User: newUser,
+ }, nil
+}
diff --git a/backend/internal/server2/operations.go b/backend/internal/server2/operations.go
new file mode 100644
index 0000000..c632cd1
--- /dev/null
+++ b/backend/internal/server2/operations.go
@@ -0,0 +1,130 @@
+package server2
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "log/slog"
+ "strings"
+
+ spb "ibd-trader/api/gen/idb/stock/v1"
+ "ibd-trader/internal/leader/manager/ibd/scrape"
+ "ibd-trader/internal/redis/taskqueue"
+ "ibd-trader/internal/server2/idb/stock/v1"
+
+ "cloud.google.com/go/longrunning/autogen/longrunningpb"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "google.golang.org/protobuf/types/known/anypb"
+ "google.golang.org/protobuf/types/known/timestamppb"
+)
+
+type operationServer struct {
+ longrunningpb.UnimplementedOperationsServer
+
+ scrape taskqueue.TaskQueue[scrape.TaskInfo]
+}
+
+func newOperationServer(scrapeQueue taskqueue.TaskQueue[scrape.TaskInfo]) *operationServer {
+ return &operationServer{scrape: scrapeQueue}
+}
+
+func (o *operationServer) ListOperations(
+ ctx context.Context,
+ req *longrunningpb.ListOperationsRequest,
+) (*longrunningpb.ListOperationsResponse, error) {
+ var end taskqueue.TaskID
+ if req.PageToken != "" {
+ var err error
+ end, err = taskqueue.ParseTaskID(req.PageToken)
+ if err != nil {
+ return nil, status.New(codes.InvalidArgument, err.Error()).Err()
+ }
+ } else {
+ end = taskqueue.TaskID{}
+ }
+
+ switch req.Name {
+ case stock.ScrapeOperationPrefix:
+ tasks, err := o.scrape.List(ctx, taskqueue.TaskID{}, end, int64(req.PageSize))
+ if err != nil {
+ return nil, status.New(codes.Internal, "unable to list IDs").Err()
+ }
+
+ ops := make([]*longrunningpb.Operation, len(tasks))
+ for i, task := range tasks {
+ ops[i] = &longrunningpb.Operation{
+ Name: fmt.Sprintf("%s/%s", stock.ScrapeOperationPrefix, task.ID.String()),
+ Metadata: new(anypb.Any),
+ Done: task.Done,
+ Result: nil,
+ }
+ err = ops[i].Metadata.MarshalFrom(&spb.StockScrapeOperationMetadata{
+ Symbol: task.Data.Symbol,
+ StartTime: timestamppb.New(task.ID.Timestamp()),
+ })
+ if err != nil {
+ return nil, status.New(codes.Internal, "unable to marshal metadata").Err()
+ }
+
+ if task.Done && task.Error != "" {
+ s := status.New(codes.Unknown, task.Error)
+ ops[i].Result = &longrunningpb.Operation_Error{Error: s.Proto()}
+ }
+ }
+
+ var nextPageToken string
+ if len(tasks) == int(req.PageSize) {
+ nextPageToken = tasks[len(tasks)-1].ID.String()
+ } else {
+ nextPageToken = ""
+ }
+
+ return &longrunningpb.ListOperationsResponse{
+ Operations: ops,
+ NextPageToken: nextPageToken,
+ }, nil
+ default:
+ return nil, status.New(codes.NotFound, "unknown operation type").Err()
+ }
+}
+
+func (o *operationServer) GetOperation(ctx context.Context, req *longrunningpb.GetOperationRequest) (*longrunningpb.Operation, error) {
+ prefix, id, ok := strings.Cut(req.Name, "/")
+ if !ok || prefix == "" || id == "" {
+ return nil, status.New(codes.InvalidArgument, "invalid operation name").Err()
+ }
+
+ taskID, err := taskqueue.ParseTaskID(id)
+ if err != nil {
+ return nil, status.New(codes.InvalidArgument, err.Error()).Err()
+ }
+
+ switch prefix {
+ case stock.ScrapeOperationPrefix:
+ task, err := o.scrape.Data(ctx, taskID)
+ if errors.Is(err, taskqueue.ErrTaskNotFound) {
+ return nil, status.New(codes.NotFound, "operation not found").Err()
+ }
+ if err != nil {
+ slog.ErrorContext(ctx, "unable to get operation", "error", err)
+ return nil, status.New(codes.Internal, "unable to get operation").Err()
+ }
+ op := &longrunningpb.Operation{
+ Name: req.Name,
+ Metadata: new(anypb.Any),
+ Done: task.Done,
+ Result: nil,
+ }
+ err = op.Metadata.MarshalFrom(&spb.StockScrapeOperationMetadata{
+ Symbol: task.Data.Symbol,
+ StartTime: timestamppb.New(task.ID.Timestamp()),
+ })
+ if err != nil {
+ return nil, status.New(codes.Internal, "unable to marshal metadata").Err()
+ }
+ return op, nil
+ default:
+ return nil, status.New(codes.NotFound, "unknown operation type").Err()
+ }
+}
diff --git a/backend/internal/server2/server.go b/backend/internal/server2/server.go
new file mode 100644
index 0000000..4731bdd
--- /dev/null
+++ b/backend/internal/server2/server.go
@@ -0,0 +1,71 @@
+package server2
+
+import (
+ "context"
+ "fmt"
+ "log/slog"
+ "net"
+
+ spb "ibd-trader/api/gen/idb/stock/v1"
+ upb "ibd-trader/api/gen/idb/user/v1"
+ "ibd-trader/internal/database"
+ "ibd-trader/internal/leader/manager/ibd/scrape"
+ "ibd-trader/internal/redis/taskqueue"
+ "ibd-trader/internal/server2/idb/stock/v1"
+ "ibd-trader/internal/server2/idb/user/v1"
+
+ "cloud.google.com/go/longrunning/autogen/longrunningpb"
+ "github.com/redis/go-redis/v9"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/reflection"
+)
+
+type Server struct {
+ s *grpc.Server
+ port uint16
+}
+
+func New(
+ ctx context.Context,
+ port uint16,
+ db database.Database,
+ rClient *redis.Client,
+) (*Server, error) {
+ scrapeQueue, err := taskqueue.New(
+ ctx,
+ rClient,
+ scrape.Queue,
+ "grpc-server",
+ taskqueue.WithEncoding[scrape.TaskInfo](scrape.QueueEncoding))
+ if err != nil {
+ return nil, err
+ }
+
+ s := grpc.NewServer()
+ upb.RegisterUserServiceServer(s, user.New(db))
+ spb.RegisterStockServiceServer(s, stock.New(db, scrapeQueue))
+ longrunningpb.RegisterOperationsServer(s, newOperationServer(scrapeQueue))
+ reflection.Register(s)
+ return &Server{s, port}, nil
+}
+
+func (s *Server) Serve(ctx context.Context) error {
+ lis, err := net.Listen("tcp", fmt.Sprintf(":%d", s.port))
+ if err != nil {
+ return err
+ }
+
+ // Graceful shutdown
+ go func() {
+ <-ctx.Done()
+ slog.ErrorContext(ctx,
+ "Shutting down server",
+ "err", ctx.Err(),
+ "cause", context.Cause(ctx),
+ )
+ s.s.GracefulStop()
+ }()
+
+ slog.InfoContext(ctx, "Starting gRPC server", "port", s.port)
+ return s.s.Serve(lis)
+}