1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
package server
import (
"context"
"fmt"
"log/slog"
"net"
"cloud.google.com/go/longrunning/autogen/longrunningpb"
spb "github.com/ansg191/ibd-trader/backend/api/idb/stock/v1"
upb "github.com/ansg191/ibd-trader/backend/api/idb/user/v1"
"github.com/ansg191/ibd-trader/backend/internal/database"
"github.com/ansg191/ibd-trader/backend/internal/ibd"
"github.com/ansg191/ibd-trader/backend/internal/keys"
"github.com/ansg191/ibd-trader/backend/internal/leader/manager/ibd/scrape"
"github.com/ansg191/ibd-trader/backend/internal/redis/taskqueue"
"github.com/ansg191/ibd-trader/backend/internal/server/idb/stock/v1"
"github.com/ansg191/ibd-trader/backend/internal/server/idb/user/v1"
"github.com/redis/go-redis/v9"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
//go:generate make -C ../../../api/ generate-go
type Server struct {
s *grpc.Server
port uint16
}
func New(
ctx context.Context,
port uint16,
db database.TransactionExecutor,
rClient *redis.Client,
client *ibd.Client,
kms keys.KeyManagementService,
keyName string,
) (*Server, error) {
scrapeQueue, err := taskqueue.New(
ctx,
rClient,
scrape.Queue,
"grpc-server",
taskqueue.WithEncoding[scrape.TaskInfo](scrape.QueueEncoding))
if err != nil {
return nil, err
}
s := grpc.NewServer()
upb.RegisterUserServiceServer(s, user.New(db, kms, keyName, client))
spb.RegisterStockServiceServer(s, stock.New(db, scrapeQueue))
longrunningpb.RegisterOperationsServer(s, newOperationServer(scrapeQueue))
reflection.Register(s)
return &Server{s, port}, nil
}
func (s *Server) Serve(ctx context.Context) error {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", s.port))
if err != nil {
return err
}
// Graceful shutdown
go func() {
<-ctx.Done()
slog.ErrorContext(ctx,
"Shutting down server",
"err", ctx.Err(),
"cause", context.Cause(ctx),
)
s.s.GracefulStop()
}()
slog.InfoContext(ctx, "Starting gRPC server", "port", s.port)
return s.s.Serve(lis)
}
|