diff options
Diffstat (limited to 'backend/internal/server/operations.go')
-rw-r--r-- | backend/internal/server/operations.go | 142 |
1 files changed, 142 insertions, 0 deletions
diff --git a/backend/internal/server/operations.go b/backend/internal/server/operations.go new file mode 100644 index 0000000..2487427 --- /dev/null +++ b/backend/internal/server/operations.go @@ -0,0 +1,142 @@ +package server + +import ( + "context" + "errors" + "fmt" + "log/slog" + "strings" + + "cloud.google.com/go/longrunning/autogen/longrunningpb" + spb "github.com/ansg191/ibd-trader-backend/api/gen/idb/stock/v1" + "github.com/ansg191/ibd-trader-backend/internal/leader/manager/ibd/scrape" + "github.com/ansg191/ibd-trader-backend/internal/redis/taskqueue" + "github.com/ansg191/ibd-trader-backend/internal/server/idb/stock/v1" + epb "google.golang.org/genproto/googleapis/rpc/errdetails" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/timestamppb" +) + +type operationServer struct { + longrunningpb.UnimplementedOperationsServer + + scrape taskqueue.TaskQueue[scrape.TaskInfo] +} + +func newOperationServer(scrapeQueue taskqueue.TaskQueue[scrape.TaskInfo]) *operationServer { + return &operationServer{scrape: scrapeQueue} +} + +func (o *operationServer) ListOperations( + ctx context.Context, + req *longrunningpb.ListOperationsRequest, +) (*longrunningpb.ListOperationsResponse, error) { + var end taskqueue.TaskID + if req.PageToken != "" { + var err error + end, err = taskqueue.ParseTaskID(req.PageToken) + if err != nil { + return nil, status.New(codes.InvalidArgument, err.Error()).Err() + } + } else { + end = taskqueue.TaskID{} + } + + switch req.Name { + case stock.ScrapeOperationPrefix: + tasks, err := o.scrape.List(ctx, taskqueue.TaskID{}, end, int64(req.PageSize)) + if err != nil { + return nil, status.New(codes.Internal, "unable to list IDs").Err() + } + + ops := make([]*longrunningpb.Operation, len(tasks)) + for i, task := range tasks { + ops[i] = &longrunningpb.Operation{ + Name: fmt.Sprintf("%s/%s", stock.ScrapeOperationPrefix, task.ID.String()), + Metadata: new(anypb.Any), + Done: task.Result != nil, + Result: nil, + } + err = ops[i].Metadata.MarshalFrom(&spb.StockScrapeOperationMetadata{ + Symbol: task.Data.Symbol, + StartTime: timestamppb.New(task.ID.Timestamp()), + }) + if err != nil { + return nil, status.New(codes.Internal, "unable to marshal metadata").Err() + } + + switch res := task.Result.(type) { + case *taskqueue.TaskResultSuccess: + return nil, status.New(codes.Unimplemented, "not implemented").Err() + case *taskqueue.TaskResultError: + s := status.New(codes.Unknown, res.Error) + s, err = s.WithDetails( + &epb.ErrorInfo{ + Reason: "", + Domain: "", + Metadata: nil, + }) + if err != nil { + return nil, status.New(codes.Internal, "unable to marshal error details").Err() + } + ops[i].Result = &longrunningpb.Operation_Error{Error: s.Proto()} + } + } + + var nextPageToken string + if len(tasks) == int(req.PageSize) { + nextPageToken = tasks[len(tasks)-1].ID.String() + } else { + nextPageToken = "" + } + + return &longrunningpb.ListOperationsResponse{ + Operations: ops, + NextPageToken: nextPageToken, + }, nil + default: + return nil, status.New(codes.NotFound, "unknown operation type").Err() + } +} + +func (o *operationServer) GetOperation(ctx context.Context, req *longrunningpb.GetOperationRequest) (*longrunningpb.Operation, error) { + prefix, id, ok := strings.Cut(req.Name, "/") + if !ok || prefix == "" || id == "" { + return nil, status.New(codes.InvalidArgument, "invalid operation name").Err() + } + + taskID, err := taskqueue.ParseTaskID(id) + if err != nil { + return nil, status.New(codes.InvalidArgument, err.Error()).Err() + } + + switch prefix { + case stock.ScrapeOperationPrefix: + task, err := o.scrape.Data(ctx, taskID) + if errors.Is(err, taskqueue.ErrTaskNotFound) { + return nil, status.New(codes.NotFound, "operation not found").Err() + } + if err != nil { + slog.ErrorContext(ctx, "unable to get operation", "error", err) + return nil, status.New(codes.Internal, "unable to get operation").Err() + } + op := &longrunningpb.Operation{ + Name: req.Name, + Metadata: new(anypb.Any), + Done: task.Result != nil, + Result: nil, + } + err = op.Metadata.MarshalFrom(&spb.StockScrapeOperationMetadata{ + Symbol: task.Data.Symbol, + StartTime: timestamppb.New(task.ID.Timestamp()), + }) + if err != nil { + return nil, status.New(codes.Internal, "unable to marshal metadata").Err() + } + return op, nil + default: + return nil, status.New(codes.NotFound, "unknown operation type").Err() + } +} |