-
Notifications
You must be signed in to change notification settings - Fork 36
Expand file tree
/
Copy pathserver.go
More file actions
437 lines (361 loc) · 14.1 KB
/
server.go
File metadata and controls
437 lines (361 loc) · 14.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
// Copyright AGNTCY Contributors (https://github.com/agntcy)
// SPDX-License-Identifier: Apache-2.0
package server
import (
"context"
"fmt"
"net"
"os"
"os/signal"
"syscall"
"time"
corev1 "github.com/agntcy/dir/api/core/v1"
eventsv1 "github.com/agntcy/dir/api/events/v1"
namingv1 "github.com/agntcy/dir/api/naming/v1"
routingv1 "github.com/agntcy/dir/api/routing/v1"
searchv1 "github.com/agntcy/dir/api/search/v1"
signv1 "github.com/agntcy/dir/api/sign/v1"
storev1 "github.com/agntcy/dir/api/store/v1"
"github.com/agntcy/dir/api/version"
"github.com/agntcy/dir/server/authn"
"github.com/agntcy/dir/server/authz"
"github.com/agntcy/dir/server/config"
"github.com/agntcy/dir/server/controller"
"github.com/agntcy/dir/server/database"
"github.com/agntcy/dir/server/events"
"github.com/agntcy/dir/server/healthcheck"
"github.com/agntcy/dir/server/metrics"
grpclogging "github.com/agntcy/dir/server/middleware/logging"
grpcratelimit "github.com/agntcy/dir/server/middleware/ratelimit"
grpcrecovery "github.com/agntcy/dir/server/middleware/recovery"
"github.com/agntcy/dir/server/naming"
"github.com/agntcy/dir/server/naming/wellknown"
"github.com/agntcy/dir/server/publication"
"github.com/agntcy/dir/server/routing"
"github.com/agntcy/dir/server/store"
"github.com/agntcy/dir/server/types"
"github.com/agntcy/dir/utils/logging"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/reflection"
)
const (
// bytesToMB is the conversion factor from bytes to megabytes.
bytesToMB = 1024 * 1024
)
var (
_ types.API = &Server{}
logger = logging.Logger("server")
)
type Server struct {
options types.APIOptions
store types.StoreAPI
routing types.RoutingAPI
database types.DatabaseAPI
eventService *events.Service
authnService *authn.Service
authzService *authz.Service
publicationService *publication.Service
health *healthcheck.Checker
grpcServer *grpc.Server
metricsServer *metrics.Server
}
// buildConnectionOptions creates gRPC server options for connection management.
// These options configure connection limits, keepalive parameters, and message size limits
// to prevent resource exhaustion and detect dead connections.
//
// Connection management is applied BEFORE all interceptors to ensure limits are enforced
// at the lowest level, protecting all other server components.
func buildConnectionOptions(cfg config.ConnectionConfig) []grpc.ServerOption {
opts := []grpc.ServerOption{
// Connection limits - prevent resource monopolization
grpc.MaxConcurrentStreams(cfg.MaxConcurrentStreams),
grpc.MaxRecvMsgSize(cfg.MaxRecvMsgSize),
grpc.MaxSendMsgSize(cfg.MaxSendMsgSize),
grpc.ConnectionTimeout(cfg.ConnectionTimeout),
// Keepalive parameters - detect dead connections and rotate aged connections
grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: cfg.Keepalive.MaxConnectionIdle,
MaxConnectionAge: cfg.Keepalive.MaxConnectionAge,
MaxConnectionAgeGrace: cfg.Keepalive.MaxConnectionAgeGrace,
Time: cfg.Keepalive.Time,
Timeout: cfg.Keepalive.Timeout,
}),
// Keepalive enforcement policy - prevent client abuse
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: cfg.Keepalive.MinTime,
PermitWithoutStream: cfg.Keepalive.PermitWithoutStream,
}),
}
logger.Info("Connection management configured",
"max_concurrent_streams", cfg.MaxConcurrentStreams,
"max_recv_msg_size_mb", cfg.MaxRecvMsgSize/bytesToMB,
"max_send_msg_size_mb", cfg.MaxSendMsgSize/bytesToMB,
"connection_timeout", cfg.ConnectionTimeout,
"max_connection_idle", cfg.Keepalive.MaxConnectionIdle,
"max_connection_age", cfg.Keepalive.MaxConnectionAge,
"keepalive_time", cfg.Keepalive.Time,
"keepalive_timeout", cfg.Keepalive.Timeout,
)
return opts
}
func Run(ctx context.Context, cfg *config.Config) error {
errCh := make(chan error)
server, err := New(ctx, cfg)
if err != nil {
return fmt.Errorf("failed to create server: %w", err)
}
// Start server
if err := server.start(ctx); err != nil {
return fmt.Errorf("failed to start server: %w", err)
}
defer server.Close(ctx)
// Wait for deactivation
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
select {
case <-ctx.Done():
return fmt.Errorf("stopping server due to context cancellation: %w", ctx.Err())
case sig := <-sigCh:
return fmt.Errorf("stopping server due to signal: %v", sig)
case err := <-errCh:
return fmt.Errorf("stopping server due to error: %w", err)
}
}
func configureOASFValidation(cfg *config.Config) error {
// Initialize OASF validator with schema URL from configuration
// Schema URL is required for OASF API validation
if err := corev1.InitializeValidator(cfg.OASFAPIValidation.SchemaURL); err != nil {
return fmt.Errorf("failed to initialize OASF validator: %w", err)
}
logger.Info("OASF validator configured",
"schema_url", cfg.OASFAPIValidation.SchemaURL)
return nil
}
//nolint:cyclop // This function has been at the limit; refactoring is out of scope.
func New(ctx context.Context, cfg *config.Config) (*Server, error) {
logger.Debug("Creating server with config", "config", cfg, "version", version.String())
if err := configureOASFValidation(cfg); err != nil {
return nil, err
}
// Load options
options := types.NewOptions(cfg)
serverOpts := []grpc.ServerOption{}
// Add connection management options FIRST (lowest level - applies to all connections)
// This must be before interceptors to ensure connection limits protect all server components
connConfig := cfg.Connection.WithDefaults()
connectionOpts := buildConnectionOptions(connConfig)
serverOpts = append(serverOpts, connectionOpts...)
// Add panic recovery interceptors (after connection management, before other interceptors)
// This prevents server crashes from panics in handlers or other interceptors
serverOpts = append(serverOpts, grpcrecovery.ServerOptions()...)
// Add rate limiting interceptors (after recovery, before logging and auth)
// This protects authentication and other downstream processes from DDoS attacks
if cfg.RateLimit.Enabled {
rateLimitOpts, err := grpcratelimit.ServerOptions(&cfg.RateLimit)
if err != nil {
return nil, fmt.Errorf("failed to create rate limit interceptors: %w", err)
}
serverOpts = append(serverOpts, rateLimitOpts...)
logger.Info("Rate limiting enabled",
"global_rps", cfg.RateLimit.GlobalRPS,
"per_client_rps", cfg.RateLimit.PerClientRPS,
)
}
// Initialize metrics server (if enabled)
var metricsServer *metrics.Server
if cfg.Metrics.Enabled {
metricsServer = metrics.New(cfg.Metrics.Address)
// Add gRPC metrics interceptors (after recovery/rate limit, before logging)
// Metrics should capture all requests, independent of logging configuration
metricsOpts := metrics.ServerOptions()
serverOpts = append(serverOpts, metricsOpts...)
logger.Info("Metrics enabled", "address", cfg.Metrics.Address)
}
// Add gRPC logging interceptors (after metrics, before auth/authz)
grpcLogger := logging.Logger("grpc")
loggingOpts := grpclogging.ServerOptions(grpcLogger, cfg.Logging.Verbose)
serverOpts = append(serverOpts, loggingOpts...)
// Create event service first (so other services can emit events)
eventService := events.New()
safeEventBus := events.NewSafeEventBus(eventService.Bus())
// Add event bus to options for other services
options = options.WithEventBus(safeEventBus)
// Create APIs
storeAPI, err := store.New(options) //nolint:staticcheck
if err != nil {
return nil, fmt.Errorf("failed to create store: %w", err)
}
routingAPI, err := routing.New(ctx, storeAPI, options)
if err != nil {
return nil, fmt.Errorf("failed to create routing: %w", err)
}
databaseAPI, err := database.New(cfg.Database)
if err != nil {
return nil, fmt.Errorf("failed to create database API: %w", err)
}
// Create JWT authentication service if enabled
var authnService *authn.Service
if cfg.Authn.Enabled {
authnService, err = authn.New(ctx, cfg.Authn)
if err != nil {
return nil, fmt.Errorf("failed to create authn service: %w", err)
}
//nolint:contextcheck
serverOpts = append(serverOpts, authnService.GetServerOptions()...)
}
var authzService *authz.Service
if cfg.Authz.Enabled {
authzService, err = authz.New(ctx, cfg.Authz)
if err != nil {
return nil, fmt.Errorf("failed to create authz service: %w", err)
}
//nolint:contextcheck
serverOpts = append(serverOpts, authzService.GetServerOptions()...)
}
// Create publication service
publicationService, err := publication.New(databaseAPI, storeAPI, routingAPI, options)
if err != nil {
return nil, fmt.Errorf("failed to create publication service: %w", err)
}
// Create a server
grpcServer := grpc.NewServer(serverOpts...)
// Create health checker
healthChecker := healthcheck.New()
// Create naming provider for naming service
wellKnownFetcher := wellknown.NewFetcher()
namingProvider := naming.NewProvider(
naming.WithWellKnownLookup(wellKnownFetcher),
)
// Register APIs
eventsv1.RegisterEventServiceServer(grpcServer, controller.NewEventsController(eventService))
storev1.RegisterStoreServiceServer(grpcServer, controller.NewStoreController(storeAPI, databaseAPI, options.EventBus()))
routingv1.RegisterRoutingServiceServer(grpcServer, controller.NewRoutingController(routingAPI, storeAPI, publicationService))
routingv1.RegisterPublicationServiceServer(grpcServer, controller.NewPublicationController(databaseAPI, options))
searchv1.RegisterSearchServiceServer(grpcServer, controller.NewSearchController(databaseAPI, storeAPI))
storev1.RegisterSyncServiceServer(grpcServer, controller.NewSyncController(databaseAPI, options))
signv1.RegisterSignServiceServer(grpcServer, controller.NewSignController(databaseAPI))
namingv1.RegisterNamingServiceServer(grpcServer, controller.NewNamingController(
storeAPI,
databaseAPI,
namingProvider,
controller.WithVerificationTTL(options.Config().Naming.GetTTL()),
))
// Register health service
healthChecker.Register(grpcServer)
// Register reflection service
reflection.Register(grpcServer)
// Initialize metrics after service registration
if metricsServer != nil {
metrics.InitializeMetrics(grpcServer, metricsServer)
logger.Info("gRPC metrics registered")
}
return &Server{
options: options,
store: storeAPI,
routing: routingAPI,
database: databaseAPI,
eventService: eventService,
authnService: authnService,
authzService: authzService,
publicationService: publicationService,
health: healthChecker,
grpcServer: grpcServer,
metricsServer: metricsServer,
}, nil
}
func (s Server) Options() types.APIOptions { return s.options }
func (s Server) Store() types.StoreAPI { return s.store }
func (s Server) Routing() types.RoutingAPI { return s.routing }
func (s Server) Database() types.DatabaseAPI { return s.database }
// Close gracefully shuts down all server components.
// Complexity is acceptable for cleanup functions with independent service shutdowns.
//
//nolint:cyclop // Cleanup function requires checking each service independently
func (s Server) Close(ctx context.Context) {
// Stop health check monitoring
if s.health != nil {
stopCtx, cancel := context.WithTimeout(ctx, 5*time.Second) //nolint:mnd
defer cancel()
if err := s.health.Stop(stopCtx); err != nil {
logger.Error("Failed to stop health check service", "error", err)
}
}
// Stop event service
if s.eventService != nil {
if err := s.eventService.Stop(); err != nil {
logger.Error("Failed to stop event service", "error", err)
}
}
// Stop metrics server
if s.metricsServer != nil {
stopCtx, cancel := context.WithTimeout(ctx, 10*time.Second) //nolint:mnd
defer cancel()
if err := s.metricsServer.Stop(stopCtx); err != nil {
logger.Error("Failed to stop metrics server", "error", err)
}
}
// Stop routing service (closes GossipSub, p2p server, DHT)
if s.routing != nil {
if err := s.routing.Stop(); err != nil {
logger.Error("Failed to stop routing service", "error", err)
}
}
// Stop authn service if running
if s.authnService != nil {
if err := s.authnService.Stop(); err != nil {
logger.Error("Failed to stop authn service", "error", err)
}
}
// Stop authz service if running
if s.authzService != nil {
if err := s.authzService.Stop(); err != nil {
logger.Error("Failed to stop authz service", "error", err)
}
}
// Stop publication service if running
if s.publicationService != nil {
if err := s.publicationService.Stop(); err != nil {
logger.Error("Failed to stop publication service", "error", err)
}
}
s.grpcServer.GracefulStop()
}
func (s Server) start(ctx context.Context) error {
// Start metrics server
if s.metricsServer != nil {
if err := s.metricsServer.Start(); err != nil {
return fmt.Errorf("failed to start metrics server: %w", err)
}
logger.Info("Metrics server started")
}
// Start publication service
if s.publicationService != nil {
if err := s.publicationService.Start(ctx); err != nil {
return fmt.Errorf("failed to start publication service: %w", err)
}
logger.Info("Publication service started")
}
// Create a listener on TCP port
listen, err := net.Listen("tcp", s.Options().Config().ListenAddress) //nolint:noctx
if err != nil {
return fmt.Errorf("failed to listen on %s: %w", s.Options().Config().ListenAddress, err)
}
// Add readiness checks
s.health.AddReadinessCheck("database", s.database.IsReady)
s.health.AddReadinessCheck("publication", s.publicationService.IsReady)
s.health.AddReadinessCheck("store", s.store.IsReady)
s.health.AddReadinessCheck("routing", s.routing.IsReady)
// Start health check monitoring
if err := s.health.Start(ctx); err != nil {
return fmt.Errorf("failed to start health check monitoring: %w", err)
}
// Serve gRPC server in the background
go func() {
logger.Info("Server starting", "address", s.Options().Config().ListenAddress)
if err := s.grpcServer.Serve(listen); err != nil {
logger.Error("Failed to start server", "error", err)
}
}()
return nil
}