aboutsummaryrefslogtreecommitdiff
path: root/backend/internal/server/operations.go
diff options
context:
space:
mode:
Diffstat (limited to 'backend/internal/server/operations.go')
-rw-r--r--backend/internal/server/operations.go142
1 files changed, 142 insertions, 0 deletions
diff --git a/backend/internal/server/operations.go b/backend/internal/server/operations.go
new file mode 100644
index 0000000..2487427
--- /dev/null
+++ b/backend/internal/server/operations.go
@@ -0,0 +1,142 @@
+package server
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "log/slog"
+ "strings"
+
+ "cloud.google.com/go/longrunning/autogen/longrunningpb"
+ spb "github.com/ansg191/ibd-trader-backend/api/gen/idb/stock/v1"
+ "github.com/ansg191/ibd-trader-backend/internal/leader/manager/ibd/scrape"
+ "github.com/ansg191/ibd-trader-backend/internal/redis/taskqueue"
+ "github.com/ansg191/ibd-trader-backend/internal/server/idb/stock/v1"
+ epb "google.golang.org/genproto/googleapis/rpc/errdetails"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "google.golang.org/protobuf/types/known/anypb"
+ "google.golang.org/protobuf/types/known/timestamppb"
+)
+
+type operationServer struct {
+ longrunningpb.UnimplementedOperationsServer
+
+ scrape taskqueue.TaskQueue[scrape.TaskInfo]
+}
+
+func newOperationServer(scrapeQueue taskqueue.TaskQueue[scrape.TaskInfo]) *operationServer {
+ return &operationServer{scrape: scrapeQueue}
+}
+
+func (o *operationServer) ListOperations(
+ ctx context.Context,
+ req *longrunningpb.ListOperationsRequest,
+) (*longrunningpb.ListOperationsResponse, error) {
+ var end taskqueue.TaskID
+ if req.PageToken != "" {
+ var err error
+ end, err = taskqueue.ParseTaskID(req.PageToken)
+ if err != nil {
+ return nil, status.New(codes.InvalidArgument, err.Error()).Err()
+ }
+ } else {
+ end = taskqueue.TaskID{}
+ }
+
+ switch req.Name {
+ case stock.ScrapeOperationPrefix:
+ tasks, err := o.scrape.List(ctx, taskqueue.TaskID{}, end, int64(req.PageSize))
+ if err != nil {
+ return nil, status.New(codes.Internal, "unable to list IDs").Err()
+ }
+
+ ops := make([]*longrunningpb.Operation, len(tasks))
+ for i, task := range tasks {
+ ops[i] = &longrunningpb.Operation{
+ Name: fmt.Sprintf("%s/%s", stock.ScrapeOperationPrefix, task.ID.String()),
+ Metadata: new(anypb.Any),
+ Done: task.Result != nil,
+ Result: nil,
+ }
+ err = ops[i].Metadata.MarshalFrom(&spb.StockScrapeOperationMetadata{
+ Symbol: task.Data.Symbol,
+ StartTime: timestamppb.New(task.ID.Timestamp()),
+ })
+ if err != nil {
+ return nil, status.New(codes.Internal, "unable to marshal metadata").Err()
+ }
+
+ switch res := task.Result.(type) {
+ case *taskqueue.TaskResultSuccess:
+ return nil, status.New(codes.Unimplemented, "not implemented").Err()
+ case *taskqueue.TaskResultError:
+ s := status.New(codes.Unknown, res.Error)
+ s, err = s.WithDetails(
+ &epb.ErrorInfo{
+ Reason: "",
+ Domain: "",
+ Metadata: nil,
+ })
+ if err != nil {
+ return nil, status.New(codes.Internal, "unable to marshal error details").Err()
+ }
+ ops[i].Result = &longrunningpb.Operation_Error{Error: s.Proto()}
+ }
+ }
+
+ var nextPageToken string
+ if len(tasks) == int(req.PageSize) {
+ nextPageToken = tasks[len(tasks)-1].ID.String()
+ } else {
+ nextPageToken = ""
+ }
+
+ return &longrunningpb.ListOperationsResponse{
+ Operations: ops,
+ NextPageToken: nextPageToken,
+ }, nil
+ default:
+ return nil, status.New(codes.NotFound, "unknown operation type").Err()
+ }
+}
+
+func (o *operationServer) GetOperation(ctx context.Context, req *longrunningpb.GetOperationRequest) (*longrunningpb.Operation, error) {
+ prefix, id, ok := strings.Cut(req.Name, "/")
+ if !ok || prefix == "" || id == "" {
+ return nil, status.New(codes.InvalidArgument, "invalid operation name").Err()
+ }
+
+ taskID, err := taskqueue.ParseTaskID(id)
+ if err != nil {
+ return nil, status.New(codes.InvalidArgument, err.Error()).Err()
+ }
+
+ switch prefix {
+ case stock.ScrapeOperationPrefix:
+ task, err := o.scrape.Data(ctx, taskID)
+ if errors.Is(err, taskqueue.ErrTaskNotFound) {
+ return nil, status.New(codes.NotFound, "operation not found").Err()
+ }
+ if err != nil {
+ slog.ErrorContext(ctx, "unable to get operation", "error", err)
+ return nil, status.New(codes.Internal, "unable to get operation").Err()
+ }
+ op := &longrunningpb.Operation{
+ Name: req.Name,
+ Metadata: new(anypb.Any),
+ Done: task.Result != nil,
+ Result: nil,
+ }
+ err = op.Metadata.MarshalFrom(&spb.StockScrapeOperationMetadata{
+ Symbol: task.Data.Symbol,
+ StartTime: timestamppb.New(task.ID.Timestamp()),
+ })
+ if err != nil {
+ return nil, status.New(codes.Internal, "unable to marshal metadata").Err()
+ }
+ return op, nil
+ default:
+ return nil, status.New(codes.NotFound, "unknown operation type").Err()
+ }
+}