1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
package server
import (
"context"
"errors"
"fmt"
"log/slog"
"strings"
"cloud.google.com/go/longrunning/autogen/longrunningpb"
spb "github.com/ansg191/ibd-trader-backend/api/gen/idb/stock/v1"
"github.com/ansg191/ibd-trader-backend/internal/leader/manager/ibd/scrape"
"github.com/ansg191/ibd-trader-backend/internal/redis/taskqueue"
"github.com/ansg191/ibd-trader-backend/internal/server/idb/stock/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"
)
type operationServer struct {
longrunningpb.UnimplementedOperationsServer
scrape taskqueue.TaskQueue[scrape.TaskInfo]
}
func newOperationServer(scrapeQueue taskqueue.TaskQueue[scrape.TaskInfo]) *operationServer {
return &operationServer{scrape: scrapeQueue}
}
func (o *operationServer) ListOperations(
ctx context.Context,
req *longrunningpb.ListOperationsRequest,
) (*longrunningpb.ListOperationsResponse, error) {
var end taskqueue.TaskID
if req.PageToken != "" {
var err error
end, err = taskqueue.ParseTaskID(req.PageToken)
if err != nil {
return nil, status.New(codes.InvalidArgument, err.Error()).Err()
}
} else {
end = taskqueue.TaskID{}
}
switch req.Name {
case stock.ScrapeOperationPrefix:
tasks, err := o.scrape.List(ctx, taskqueue.TaskID{}, end, int64(req.PageSize))
if err != nil {
return nil, status.New(codes.Internal, "unable to list IDs").Err()
}
ops := make([]*longrunningpb.Operation, len(tasks))
for i, task := range tasks {
ops[i] = &longrunningpb.Operation{
Name: fmt.Sprintf("%s/%s", stock.ScrapeOperationPrefix, task.ID.String()),
Metadata: new(anypb.Any),
Done: task.Done,
Result: nil,
}
err = ops[i].Metadata.MarshalFrom(&spb.StockScrapeOperationMetadata{
Symbol: task.Data.Symbol,
StartTime: timestamppb.New(task.ID.Timestamp()),
})
if err != nil {
return nil, status.New(codes.Internal, "unable to marshal metadata").Err()
}
if task.Done && task.Error != "" {
s := status.New(codes.Unknown, task.Error)
ops[i].Result = &longrunningpb.Operation_Error{Error: s.Proto()}
}
}
var nextPageToken string
if len(tasks) == int(req.PageSize) {
nextPageToken = tasks[len(tasks)-1].ID.String()
} else {
nextPageToken = ""
}
return &longrunningpb.ListOperationsResponse{
Operations: ops,
NextPageToken: nextPageToken,
}, nil
default:
return nil, status.New(codes.NotFound, "unknown operation type").Err()
}
}
func (o *operationServer) GetOperation(ctx context.Context, req *longrunningpb.GetOperationRequest) (*longrunningpb.Operation, error) {
prefix, id, ok := strings.Cut(req.Name, "/")
if !ok || prefix == "" || id == "" {
return nil, status.New(codes.InvalidArgument, "invalid operation name").Err()
}
taskID, err := taskqueue.ParseTaskID(id)
if err != nil {
return nil, status.New(codes.InvalidArgument, err.Error()).Err()
}
switch prefix {
case stock.ScrapeOperationPrefix:
task, err := o.scrape.Data(ctx, taskID)
if errors.Is(err, taskqueue.ErrTaskNotFound) {
return nil, status.New(codes.NotFound, "operation not found").Err()
}
if err != nil {
slog.ErrorContext(ctx, "unable to get operation", "error", err)
return nil, status.New(codes.Internal, "unable to get operation").Err()
}
op := &longrunningpb.Operation{
Name: req.Name,
Metadata: new(anypb.Any),
Done: task.Done,
Result: nil,
}
err = op.Metadata.MarshalFrom(&spb.StockScrapeOperationMetadata{
Symbol: task.Data.Symbol,
StartTime: timestamppb.New(task.ID.Timestamp()),
})
if err != nil {
return nil, status.New(codes.Internal, "unable to marshal metadata").Err()
}
return op, nil
default:
return nil, status.New(codes.NotFound, "unknown operation type").Err()
}
}
|