package server import ( "context" "errors" "fmt" "log/slog" "strings" "cloud.google.com/go/longrunning/autogen/longrunningpb" spb "github.com/ansg191/ibd-trader/backend/api/idb/stock/v1" "github.com/ansg191/ibd-trader/backend/internal/leader/manager/ibd/scrape" "github.com/ansg191/ibd-trader/backend/internal/redis/taskqueue" "github.com/ansg191/ibd-trader/backend/internal/server/idb/stock/v1" epb "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/timestamppb" ) type operationServer struct { longrunningpb.UnimplementedOperationsServer scrape taskqueue.TaskQueue[scrape.TaskInfo] } func newOperationServer(scrapeQueue taskqueue.TaskQueue[scrape.TaskInfo]) *operationServer { return &operationServer{scrape: scrapeQueue} } func (o *operationServer) ListOperations( ctx context.Context, req *longrunningpb.ListOperationsRequest, ) (*longrunningpb.ListOperationsResponse, error) { var end taskqueue.TaskID if req.PageToken != "" { var err error end, err = taskqueue.ParseTaskID(req.PageToken) if err != nil { return nil, status.New(codes.InvalidArgument, err.Error()).Err() } } else { end = taskqueue.TaskID{} } switch req.Name { case stock.ScrapeOperationPrefix: tasks, err := o.scrape.List(ctx, taskqueue.TaskID{}, end, int64(req.PageSize)) if err != nil { return nil, status.New(codes.Internal, "unable to list IDs").Err() } ops := make([]*longrunningpb.Operation, len(tasks)) for i, task := range tasks { ops[i] = &longrunningpb.Operation{ Name: fmt.Sprintf("%s/%s", stock.ScrapeOperationPrefix, task.ID.String()), Metadata: new(anypb.Any), Done: task.Result != nil, Result: nil, } err = ops[i].Metadata.MarshalFrom(&spb.StockScrapeOperationMetadata{ Symbol: task.Data.Symbol, StartTime: timestamppb.New(task.ID.Timestamp()), }) if err != nil { return nil, status.New(codes.Internal, "unable to marshal metadata").Err() } switch res := task.Result.(type) { case *taskqueue.TaskResultSuccess: return nil, status.New(codes.Unimplemented, "not implemented").Err() case *taskqueue.TaskResultError: s := status.New(codes.Unknown, res.Error) s, err = s.WithDetails( &epb.ErrorInfo{ Reason: "", Domain: "", Metadata: nil, }) if err != nil { return nil, status.New(codes.Internal, "unable to marshal error details").Err() } ops[i].Result = &longrunningpb.Operation_Error{Error: s.Proto()} } } var nextPageToken string if len(tasks) == int(req.PageSize) { nextPageToken = tasks[len(tasks)-1].ID.String() } else { nextPageToken = "" } return &longrunningpb.ListOperationsResponse{ Operations: ops, NextPageToken: nextPageToken, }, nil default: return nil, status.New(codes.NotFound, "unknown operation type").Err() } } func (o *operationServer) GetOperation(ctx context.Context, req *longrunningpb.GetOperationRequest) (*longrunningpb.Operation, error) { prefix, id, ok := strings.Cut(req.Name, "/") if !ok || prefix == "" || id == "" { return nil, status.New(codes.InvalidArgument, "invalid operation name").Err() } taskID, err := taskqueue.ParseTaskID(id) if err != nil { return nil, status.New(codes.InvalidArgument, err.Error()).Err() } switch prefix { case stock.ScrapeOperationPrefix: task, err := o.scrape.Data(ctx, taskID) if errors.Is(err, taskqueue.ErrTaskNotFound) { return nil, status.New(codes.NotFound, "operation not found").Err() } if err != nil { slog.ErrorContext(ctx, "unable to get operation", "error", err) return nil, status.New(codes.Internal, "unable to get operation").Err() } op := &longrunningpb.Operation{ Name: req.Name, Metadata: new(anypb.Any), Done: task.Result != nil, Result: nil, } err = op.Metadata.MarshalFrom(&spb.StockScrapeOperationMetadata{ Symbol: task.Data.Symbol, StartTime: timestamppb.New(task.ID.Timestamp()), }) if err != nil { return nil, status.New(codes.Internal, "unable to marshal metadata").Err() } return op, nil default: return nil, status.New(codes.NotFound, "unknown operation type").Err() } }