aboutsummaryrefslogtreecommitdiff
path: root/backend/internal/server/operations.go
diff options
context:
space:
mode:
authorGravatar Anshul Gupta <ansg191@anshulg.com> 2024-08-08 16:53:59 -0700
committerGravatar Anshul Gupta <ansg191@anshulg.com> 2024-08-08 16:53:59 -0700
commitf34b92ded11b07f78575ac62c260a380c468e5ea (patch)
tree8ffdc68ed0f2e253e7f9feff3aa90a1182e5946c /backend/internal/server/operations.go
parenta439618cdc8168bad617d04875697b572f3ed41d (diff)
downloadibd-trader-f34b92ded11b07f78575ac62c260a380c468e5ea.tar.gz
ibd-trader-f34b92ded11b07f78575ac62c260a380c468e5ea.tar.zst
ibd-trader-f34b92ded11b07f78575ac62c260a380c468e5ea.zip
Rework redis taskqueue to store task results
Diffstat (limited to 'backend/internal/server/operations.go')
-rw-r--r--backend/internal/server/operations.go21
1 files changed, 17 insertions, 4 deletions
diff --git a/backend/internal/server/operations.go b/backend/internal/server/operations.go
index dab67f4..2487427 100644
--- a/backend/internal/server/operations.go
+++ b/backend/internal/server/operations.go
@@ -12,6 +12,7 @@ import (
"github.com/ansg191/ibd-trader-backend/internal/leader/manager/ibd/scrape"
"github.com/ansg191/ibd-trader-backend/internal/redis/taskqueue"
"github.com/ansg191/ibd-trader-backend/internal/server/idb/stock/v1"
+ epb "google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/anypb"
@@ -55,7 +56,7 @@ func (o *operationServer) ListOperations(
ops[i] = &longrunningpb.Operation{
Name: fmt.Sprintf("%s/%s", stock.ScrapeOperationPrefix, task.ID.String()),
Metadata: new(anypb.Any),
- Done: task.Done,
+ Done: task.Result != nil,
Result: nil,
}
err = ops[i].Metadata.MarshalFrom(&spb.StockScrapeOperationMetadata{
@@ -66,8 +67,20 @@ func (o *operationServer) ListOperations(
return nil, status.New(codes.Internal, "unable to marshal metadata").Err()
}
- if task.Done && task.Error != "" {
- s := status.New(codes.Unknown, task.Error)
+ switch res := task.Result.(type) {
+ case *taskqueue.TaskResultSuccess:
+ return nil, status.New(codes.Unimplemented, "not implemented").Err()
+ case *taskqueue.TaskResultError:
+ s := status.New(codes.Unknown, res.Error)
+ s, err = s.WithDetails(
+ &epb.ErrorInfo{
+ Reason: "",
+ Domain: "",
+ Metadata: nil,
+ })
+ if err != nil {
+ return nil, status.New(codes.Internal, "unable to marshal error details").Err()
+ }
ops[i].Result = &longrunningpb.Operation_Error{Error: s.Proto()}
}
}
@@ -112,7 +125,7 @@ func (o *operationServer) GetOperation(ctx context.Context, req *longrunningpb.G
op := &longrunningpb.Operation{
Name: req.Name,
Metadata: new(anypb.Any),
- Done: task.Done,
+ Done: task.Result != nil,
Result: nil,
}
err = op.Metadata.MarshalFrom(&spb.StockScrapeOperationMetadata{