From 88a35bfe4e3e27be065b755fe75e3260eadb84b4 Mon Sep 17 00:00:00 2001 From: Vitaly Isaev Date: Thu, 2 Jun 2022 16:57:50 +0300 Subject: [PATCH 1/4] CFS-2672 | feat: started working on component tests --- go.mod | 6 + go.sum | 13 ++ test/allocator/main.go | 157 +++++++++++++ test/allocator/perf/client.go | 142 ++++++++++++ test/allocator/perf/config.go | 48 ++++ test/allocator/perf/config.json | 8 + test/allocator/perf/doc.go | 2 + test/allocator/schema/allocator.pb.go | 244 +++++++++++++++++++++ test/allocator/schema/allocator.proto | 26 +++ test/allocator/schema/allocator_grpc.pb.go | 102 +++++++++ test/allocator/schema/generate.sh | 12 + test/allocator/server/config.go | 26 +++ test/allocator/server/config.json | 17 ++ test/allocator/server/doc.go | 2 + test/allocator/server/server.go | 116 ++++++++++ test/infra/allocator/Dockerfile | 6 + test/infra/docker-compose.yml | 40 ++++ test/infra/prepare.sh | 35 +++ 18 files changed, 1002 insertions(+) create mode 100644 test/allocator/main.go create mode 100644 test/allocator/perf/client.go create mode 100644 test/allocator/perf/config.go create mode 100644 test/allocator/perf/config.json create mode 100644 test/allocator/perf/doc.go create mode 100644 test/allocator/schema/allocator.pb.go create mode 100644 test/allocator/schema/allocator.proto create mode 100644 test/allocator/schema/allocator_grpc.pb.go create mode 100755 test/allocator/schema/generate.sh create mode 100644 test/allocator/server/config.go create mode 100644 test/allocator/server/config.json create mode 100644 test/allocator/server/doc.go create mode 100644 test/allocator/server/server.go create mode 100644 test/infra/allocator/Dockerfile create mode 100644 test/infra/docker-compose.yml create mode 100755 test/infra/prepare.sh diff --git a/go.mod b/go.mod index 45cec59..52ca707 100644 --- a/go.mod +++ b/go.mod @@ -13,15 +13,21 @@ require ( ) require ( + github.com/cpuguy83/go-md2man/v2 v2.0.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/google/go-cmp v0.5.6 // indirect github.com/kr/pretty v0.3.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/stretchr/objx v0.2.0 // indirect + github.com/urfave/cli/v2 v2.8.1 // indirect + github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f // indirect golang.org/x/sys v0.0.0-20220317061510-51cd9980dadf // indirect golang.org/x/text v0.3.7 // indirect + golang.org/x/time v0.0.0-20220411224347-583f2d630306 // indirect google.golang.org/genproto v0.0.0-20210608205507-b6d2f5bf0d7d // indirect google.golang.org/protobuf v1.26.0 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect diff --git a/go.sum b/go.sum index 06c9dec..ebd5a82 100644 --- a/go.sum +++ b/go.sum @@ -5,6 +5,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/cpuguy83/go-md2man/v2 v2.0.1 h1:r/myEWzV9lfsM1tFLgDyu0atFtJ1fXn261LKYj/3DxU= +github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -16,8 +18,11 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7 github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= @@ -71,6 +76,8 @@ github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5X github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= @@ -80,10 +87,14 @@ github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMT github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/tsuna/endian v0.0.0-20151020052604-29b3a4178852 h1:/HMzghBx/U8ZTQ+CCKRAsjeNNV12OCG3PfJcthNMBU0= github.com/tsuna/endian v0.0.0-20151020052604-29b3a4178852/go.mod h1:7SvkOZYNBtjd5XUi2fuPMvAZS8rlCMaU69hj/3joIsE= +github.com/urfave/cli/v2 v2.8.1 h1:CGuYNZF9IKZY/rfBe3lJpccSoIY1ytfvmgQT90cNOl4= +github.com/urfave/cli/v2 v2.8.1/go.mod h1:Z41J9TPoffeoqP0Iza0YbAhGvymRdZAd2uPmZ5JxRdY= github.com/valyala/fastrand v1.0.0 h1:LUKT9aKer2dVQNUi3waewTbKV+7H17kvWFNKs2ObdkI= github.com/valyala/fastrand v1.0.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ= github.com/villenny/fastrand64-go v0.0.0-20201008161821-3d8fa521c558 h1:oNwFCUPi4ns2fMuaBtzMdQImdt25neDPJPBTNprmdF8= github.com/villenny/fastrand64-go v0.0.0-20201008161821-3d8fa521c558/go.mod h1:0KogUQQf0cFYfgnOpYJqw1RnSb4S1oJwUb1CEpGJLJ4= +github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU= +github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8= github.com/yalue/native_endian v0.0.0-20180607135909-51013b03be4f h1:nsQCScpQ8RRf+wIooqfyyEUINV2cAPuo2uVtHSBbA4M= github.com/yalue/native_endian v0.0.0-20180607135909-51013b03be4f/go.mod h1:1cm5YQZdnDQBZVtFG2Ip8sFVN0eYZ8OFkCT2kIVl9mw= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -141,6 +152,8 @@ golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/time v0.0.0-20220411224347-583f2d630306 h1:+gHMid33q6pen7kv9xvT+JRinntgeXO2AeZVd0AWD3w= +golang.org/x/time v0.0.0-20220411224347-583f2d630306/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= diff --git a/test/allocator/main.go b/test/allocator/main.go new file mode 100644 index 0000000..c618bb3 --- /dev/null +++ b/test/allocator/main.go @@ -0,0 +1,157 @@ +package main + +import ( + "encoding/json" + "io/ioutil" + "log" + "os" + "os/signal" + "path/filepath" + "syscall" + + "github.com/go-logr/logr" + "github.com/go-logr/stdr" + "github.com/pkg/errors" + "github.com/urfave/cli/v2" + + "github.com/newcloudtechnologies/memlimiter/test/allocator/perf" + "github.com/newcloudtechnologies/memlimiter/test/allocator/server" +) + +func main() { + logger := stdr.NewWithOptions( + log.New(os.Stdout, "", log.LstdFlags), + stdr.Options{LogCaller: stdr.All}, + ) + + app := &cli.App{ + Name: "allocator", + Usage: "test application for memlimiter", + Commands: cli.Commands{ + &cli.Command{ + Name: "server", + Usage: "allocator server app", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "config", + Usage: "configuration file", + Aliases: []string{"c"}, + Required: true, + }, + }, + Action: func(context *cli.Context) error { return actionServer(logger, context) }, + }, + &cli.Command{ + Name: "perf", + Usage: "allocator perf client", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "config", + Usage: "configuration file", + Aliases: []string{"c"}, + Required: true, + }, + }, + Action: actionPerf, + }, + }, + } + + if err := app.Run(os.Args); err != nil { + logger.Error(err, "application run") + os.Exit(1) + } +} + +func actionServer(logger logr.Logger, c *cli.Context) error { + srv, err := makeServer(logger, c) + if err != nil { + return errors.Wrap(err, "make server") + } + + if err := runAndWaitSignal(srv); err != nil { + return errors.Wrap(err, "run and wait signal") + } + + return nil +} + +func makeServer(logger logr.Logger, c *cli.Context) (server.Server, error) { + filename := c.String("config") + + data, err := ioutil.ReadFile(filepath.Clean(filename)) + if err != nil { + return nil, errors.Wrap(err, "ioutil readfile") + } + + cfg := &server.Config{} + + if err = json.Unmarshal(data, cfg); err != nil { + return nil, errors.Wrap(err, "unmarshal") + } + + srv, err := server.NewAllocatorServer(logger, cfg) + if err != nil { + return nil, errors.Wrap(err, "new allocator server") + } + + return srv, nil +} + +func actionPerf(c *cli.Context) error { + perfClient, err := makePerf(c) + if err != nil { + return errors.Wrap(err, "make perf") + } + + if err := runAndWaitSignal(perfClient); err != nil { + return errors.Wrap(err, "run and wait signal") + } + + return nil +} + +func makePerf(c *cli.Context) (*perf.Client, error) { + filename := c.String("config") + + data, err := ioutil.ReadFile(filepath.Clean(filename)) + if err != nil { + return nil, errors.Wrap(err, "ioutil readfile") + } + + cfg := &perf.Config{} + + if err = json.Unmarshal(data, cfg); err != nil { + return nil, errors.Wrap(err, "unmarshal") + } + + srv, err := perf.NewClient(cfg) + if err != nil { + return nil, errors.Wrap(err, "new allocator server") + } + + return srv, nil +} + +type runnable interface { + Run() error + Quit() +} + +func runAndWaitSignal(r runnable) error { + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) + + errChan := make(chan error, 1) + + go func() { errChan <- r.Run() }() + + defer r.Quit() + + select { + case err := <-errChan: + return errors.Wrap(err, "run error") + case <-signalChan: + return nil + } +} diff --git a/test/allocator/perf/client.go b/test/allocator/perf/client.go new file mode 100644 index 0000000..8bd0d33 --- /dev/null +++ b/test/allocator/perf/client.go @@ -0,0 +1,142 @@ +package perf + +import ( + "context" + "time" + + "github.com/go-logr/logr" + "github.com/newcloudtechnologies/memlimiter/utils/config/prepare" + "github.com/rcrowley/go-metrics" + "golang.org/x/time/rate" + "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/durationpb" + + "github.com/newcloudtechnologies/memlimiter/test/allocator/schema" + "github.com/newcloudtechnologies/memlimiter/utils/breaker" + "github.com/pkg/errors" +) + +// Client - нагрузочный клиент. +type Client struct { + startTime time.Time + logger logr.Logger + grpcConn *grpc.ClientConn + client schema.AllocatorClient + breaker *breaker.Breaker + requestsInFlight metrics.Counter + cfg *Config +} + +// Run запускает нагрузочную сессию. +func (p *Client) Run() error { + if err := p.breaker.Inc(); err != nil { + return errors.Wrap(err, "breaker inc") + } + + defer p.breaker.Dec() + + monitoringTicker := time.NewTicker(time.Second) + defer monitoringTicker.Stop() + + timer := time.NewTimer(p.cfg.LoadDuration.Duration) + defer timer.Stop() + + limiter := rate.NewLimiter(p.cfg.RPS, 1) + + for { + // ожидаем, пока лимитер разрешит выполнять запрос + if err := limiter.Wait(p.breaker); err != nil { + return errors.Wrap(err, "limiter wait") + } + + // запрос + if err := p.breaker.Inc(); err != nil { + return errors.Wrap(err, "breaker inc") + } + + go p.makeRequest() + + select { + case <-monitoringTicker.C: + // периодическая печать прогресса + p.printProgress() + case <-timer.C: + // завершение нагрузки + return nil + default: + } + } +} + +func (p *Client) makeRequest() { + defer p.breaker.Dec() + + // обновление счётчика запросов в полете + p.requestsInFlight.Inc(1) + defer p.requestsInFlight.Dec(1) + + ctx, cancel := context.WithTimeout(p.breaker, p.cfg.RequestTimeout.Duration) + defer cancel() + + request := &schema.MakeAllocationRequest{ + Size: p.cfg.AllocationSize.Value, + } + + if p.cfg.PauseDuration.Duration != 0 { + request.Duration = durationpb.New(p.cfg.PauseDuration.Duration) + } + + _, err := p.client.MakeAllocation(ctx, request) + if err != nil { + p.logger.Error(err, "make allocation request") + } +} + +func (p *Client) printProgress() { + p.logger.Info( + "progress", + "elapsed_time", time.Since(p.startTime), + "in_flight", p.requestsInFlight.Count(), + ) +} + +// Quit корректно завершает работу нагрузчика. +func (p *Client) Quit() { + p.breaker.ShutdownAndWait() + + if err := p.grpcConn.Close(); err != nil { + p.logger.Error(err, "gprc connection close") + } +} + +// NewClient создаёт нагрузочный клиент. +func NewClient(cfg *Config) (*Client, error) { + if err := prepare.Prepare(cfg); err != nil { + return nil, errors.Wrap(err, "configs prepare") + } + + // FIXME: + /* + logger, err := gaben.FromConfig(cfg.Logging) + if err != nil { + return nil, errors.Wrap(err, "gaben from config") + } + */ + + grpcConn, err := grpc.Dial(cfg.Endpoint, grpc.WithInsecure()) + if err != nil { + return nil, errors.Wrap(err, "dial error") + } + + client := schema.NewAllocatorClient(grpcConn) + + return &Client{ + grpcConn: grpcConn, + logger: logr.Logger{}, // FIXME + client: client, + startTime: time.Now(), + cfg: cfg, + requestsInFlight: metrics.NewCounter(), + breaker: breaker.NewBreaker(), + }, nil +} diff --git a/test/allocator/perf/config.go b/test/allocator/perf/config.go new file mode 100644 index 0000000..4a35968 --- /dev/null +++ b/test/allocator/perf/config.go @@ -0,0 +1,48 @@ +package perf + +import ( + "golang.org/x/time/rate" + + "github.com/newcloudtechnologies/memlimiter/utils/config/bytes" + "github.com/newcloudtechnologies/memlimiter/utils/config/duration" + "github.com/pkg/errors" +) + +// Config - конфигурация нагрузочного клиента. +type Config struct { + Endpoint string `json:"endpoint"` // адрес сервера + RPS rate.Limit `json:"rps"` // количество запросов к сервису в секунду + LoadDuration duration.Duration `json:"load_duration"` // продолжительность тестовой сессии + AllocationSize bytes.Bytes `json:"allocation_size"` // размер аллокации в каждом запросе + PauseDuration duration.Duration `json:"pause_duration"` // пауза в хендлере сервиса (чтобы аллокация подольше продержались в памяти) + RequestTimeout duration.Duration `json:"request_timeout"` // таймаут запроса к сервису +} + +// Prepare - валидатор конфига. +func (c *Config) Prepare() error { + if c.Endpoint == "" { + return errors.New("empty endpoint") + } + + if c.RPS == 0 { + return errors.New("empty rps") + } + + if c.LoadDuration.Duration == 0 { + return errors.New("empty load duration") + } + + if c.AllocationSize.Value == 0 { + return errors.New("empty allocation size") + } + + if c.PauseDuration.Duration == 0 { + return errors.New("empty pause duration") + } + + if c.RequestTimeout.Duration == 0 { + return errors.New("empty request timeout") + } + + return nil +} diff --git a/test/allocator/perf/config.json b/test/allocator/perf/config.json new file mode 100644 index 0000000..ec76e45 --- /dev/null +++ b/test/allocator/perf/config.json @@ -0,0 +1,8 @@ +{ + "endpoint": "localhost:1988", + "rps": 100, + "load_duration": "1h", + "allocation_size": "1M", + "pause_duration": "5s", + "request_timeout": "1m" +} diff --git a/test/allocator/perf/doc.go b/test/allocator/perf/doc.go new file mode 100644 index 0000000..6596c31 --- /dev/null +++ b/test/allocator/perf/doc.go @@ -0,0 +1,2 @@ +// Package perf - код клиента к тестовому сервису +package perf diff --git a/test/allocator/schema/allocator.pb.go b/test/allocator/schema/allocator.pb.go new file mode 100644 index 0000000..4f60ab3 --- /dev/null +++ b/test/allocator/schema/allocator.pb.go @@ -0,0 +1,244 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.25.0 +// protoc v3.14.0 +// source: allocator.proto + +package schema + +import ( + reflect "reflect" + sync "sync" + + proto "github.com/golang/protobuf/proto" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + durationpb "google.golang.org/protobuf/types/known/durationpb" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// This is a compile-time assertion that a sufficiently up-to-date version +// of the legacy proto package is being used. +const _ = proto.ProtoPackageIsVersion4 + +// MakeAllocationRequest - запрос на аллокацию. +type MakeAllocationRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // size - размер аллокации + Size uint64 `protobuf:"varint,1,opt,name=size,proto3" json:"size,omitempty"` + // duration - продолжительность времени, на которое надо заблокировать запрос после аллокации + Duration *durationpb.Duration `protobuf:"bytes,2,opt,name=duration,proto3" json:"duration,omitempty"` +} + +func (x *MakeAllocationRequest) Reset() { + *x = MakeAllocationRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_allocator_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *MakeAllocationRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MakeAllocationRequest) ProtoMessage() {} + +func (x *MakeAllocationRequest) ProtoReflect() protoreflect.Message { + mi := &file_allocator_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MakeAllocationRequest.ProtoReflect.Descriptor instead. +func (*MakeAllocationRequest) Descriptor() ([]byte, []int) { + return file_allocator_proto_rawDescGZIP(), []int{0} +} + +func (x *MakeAllocationRequest) GetSize() uint64 { + if x != nil { + return x.Size + } + return 0 +} + +func (x *MakeAllocationRequest) GetDuration() *durationpb.Duration { + if x != nil { + return x.Duration + } + return nil +} + +// MakeAllocationResponse - ответ на запрос на аллокацию. +type MakeAllocationResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // value - просто некоторое значение + Value uint64 `protobuf:"varint,1,opt,name=value,proto3" json:"value,omitempty"` +} + +func (x *MakeAllocationResponse) Reset() { + *x = MakeAllocationResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_allocator_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *MakeAllocationResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MakeAllocationResponse) ProtoMessage() {} + +func (x *MakeAllocationResponse) ProtoReflect() protoreflect.Message { + mi := &file_allocator_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MakeAllocationResponse.ProtoReflect.Descriptor instead. +func (*MakeAllocationResponse) Descriptor() ([]byte, []int) { + return file_allocator_proto_rawDescGZIP(), []int{1} +} + +func (x *MakeAllocationResponse) GetValue() uint64 { + if x != nil { + return x.Value + } + return 0 +} + +var File_allocator_proto protoreflect.FileDescriptor + +var file_allocator_proto_rawDesc = []byte{ + 0x0a, 0x0f, 0x61, 0x6c, 0x6c, 0x6f, 0x63, 0x61, 0x74, 0x6f, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x12, 0x06, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x1a, 0x1e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, + 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x64, 0x75, 0x72, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x62, 0x0a, 0x15, 0x4d, 0x61, 0x6b, + 0x65, 0x41, 0x6c, 0x6c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, + 0x52, 0x04, 0x73, 0x69, 0x7a, 0x65, 0x12, 0x35, 0x0a, 0x08, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, + 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, + 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x52, 0x08, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x2e, 0x0a, + 0x16, 0x4d, 0x61, 0x6b, 0x65, 0x41, 0x6c, 0x6c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x32, 0x5e, 0x0a, + 0x09, 0x41, 0x6c, 0x6c, 0x6f, 0x63, 0x61, 0x74, 0x6f, 0x72, 0x12, 0x51, 0x0a, 0x0e, 0x4d, 0x61, + 0x6b, 0x65, 0x41, 0x6c, 0x6c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1d, 0x2e, 0x73, + 0x63, 0x68, 0x65, 0x6d, 0x61, 0x2e, 0x4d, 0x61, 0x6b, 0x65, 0x41, 0x6c, 0x6c, 0x6f, 0x63, 0x61, + 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x73, 0x63, + 0x68, 0x65, 0x6d, 0x61, 0x2e, 0x4d, 0x61, 0x6b, 0x65, 0x41, 0x6c, 0x6c, 0x6f, 0x63, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x43, 0x5a, + 0x41, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x73, 0x74, 0x61, 0x67, 0x65, 0x6f, 0x66, 0x66, + 0x69, 0x63, 0x65, 0x2e, 0x72, 0x75, 0x2f, 0x55, 0x43, 0x53, 0x2d, 0x43, 0x4f, 0x4d, 0x4d, 0x4f, + 0x4e, 0x2f, 0x6d, 0x65, 0x6d, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x65, 0x72, 0x2f, 0x74, 0x65, 0x73, + 0x74, 0x2f, 0x61, 0x6c, 0x6c, 0x6f, 0x63, 0x61, 0x74, 0x6f, 0x72, 0x2f, 0x73, 0x63, 0x68, 0x65, + 0x6d, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_allocator_proto_rawDescOnce sync.Once + file_allocator_proto_rawDescData = file_allocator_proto_rawDesc +) + +func file_allocator_proto_rawDescGZIP() []byte { + file_allocator_proto_rawDescOnce.Do(func() { + file_allocator_proto_rawDescData = protoimpl.X.CompressGZIP(file_allocator_proto_rawDescData) + }) + return file_allocator_proto_rawDescData +} + +var file_allocator_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_allocator_proto_goTypes = []interface{}{ + (*MakeAllocationRequest)(nil), // 0: schema.MakeAllocationRequest + (*MakeAllocationResponse)(nil), // 1: schema.MakeAllocationResponse + (*durationpb.Duration)(nil), // 2: google.protobuf.Duration +} +var file_allocator_proto_depIdxs = []int32{ + 2, // 0: schema.MakeAllocationRequest.duration:type_name -> google.protobuf.Duration + 0, // 1: schema.Allocator.MakeAllocation:input_type -> schema.MakeAllocationRequest + 1, // 2: schema.Allocator.MakeAllocation:output_type -> schema.MakeAllocationResponse + 2, // [2:3] is the sub-list for method output_type + 1, // [1:2] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_allocator_proto_init() } +func file_allocator_proto_init() { + if File_allocator_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_allocator_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*MakeAllocationRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_allocator_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*MakeAllocationResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_allocator_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_allocator_proto_goTypes, + DependencyIndexes: file_allocator_proto_depIdxs, + MessageInfos: file_allocator_proto_msgTypes, + }.Build() + File_allocator_proto = out.File + file_allocator_proto_rawDesc = nil + file_allocator_proto_goTypes = nil + file_allocator_proto_depIdxs = nil +} diff --git a/test/allocator/schema/allocator.proto b/test/allocator/schema/allocator.proto new file mode 100644 index 0000000..8771ce0 --- /dev/null +++ b/test/allocator/schema/allocator.proto @@ -0,0 +1,26 @@ +syntax = "proto3"; + +package schema; + +option go_package = "gitlab.stageoffice.ru/UCS-COMMON/memlimiter/test/allocator/schema"; + +import "google/protobuf/duration.proto"; + +// Allocator - тестовый сервис, который просто делает аллокации во время обработки запроса +service Allocator { + rpc MakeAllocation (MakeAllocationRequest) returns (MakeAllocationResponse) {} +} + +// MakeAllocationRequest - запрос на аллокацию +message MakeAllocationRequest { + // size - размер аллокации + uint64 size = 1; + // duration - продолжительность времени, на которое надо заблокировать запрос после аллокации + google.protobuf.Duration duration = 2; +} + +// MakeAllocationResponse - ответ на запрос на аллокацию +message MakeAllocationResponse { + // value - просто некоторое значение + uint64 value = 1; +} \ No newline at end of file diff --git a/test/allocator/schema/allocator_grpc.pb.go b/test/allocator/schema/allocator_grpc.pb.go new file mode 100644 index 0000000..cb47df5 --- /dev/null +++ b/test/allocator/schema/allocator_grpc.pb.go @@ -0,0 +1,102 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. + +package schema + +import ( + context "context" + + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// AllocatorClient is the client API for Allocator service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type AllocatorClient interface { + MakeAllocation(ctx context.Context, in *MakeAllocationRequest, opts ...grpc.CallOption) (*MakeAllocationResponse, error) +} + +type allocatorClient struct { + cc grpc.ClientConnInterface +} + +func NewAllocatorClient(cc grpc.ClientConnInterface) AllocatorClient { + return &allocatorClient{cc} +} + +func (c *allocatorClient) MakeAllocation(ctx context.Context, in *MakeAllocationRequest, opts ...grpc.CallOption) (*MakeAllocationResponse, error) { + out := new(MakeAllocationResponse) + err := c.cc.Invoke(ctx, "/schema.Allocator/MakeAllocation", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// AllocatorServer is the server API for Allocator service. +// All implementations must embed UnimplementedAllocatorServer +// for forward compatibility. +type AllocatorServer interface { + MakeAllocation(context.Context, *MakeAllocationRequest) (*MakeAllocationResponse, error) + mustEmbedUnimplementedAllocatorServer() +} + +// UnimplementedAllocatorServer must be embedded to have forward compatible implementations. +type UnimplementedAllocatorServer struct { +} + +func (UnimplementedAllocatorServer) MakeAllocation(context.Context, *MakeAllocationRequest) (*MakeAllocationResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method MakeAllocation not implemented") +} +func (UnimplementedAllocatorServer) mustEmbedUnimplementedAllocatorServer() {} + +// UnsafeAllocatorServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to AllocatorServer will +// result in compilation errors. +type UnsafeAllocatorServer interface { + mustEmbedUnimplementedAllocatorServer() +} + +func RegisterAllocatorServer(s grpc.ServiceRegistrar, srv AllocatorServer) { + s.RegisterService(&Allocator_ServiceDesc, srv) +} + +func _Allocator_MakeAllocation_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(MakeAllocationRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(AllocatorServer).MakeAllocation(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/schema.Allocator/MakeAllocation", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(AllocatorServer).MakeAllocation(ctx, req.(*MakeAllocationRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// Allocator_ServiceDesc is the grpc.ServiceDesc for Allocator service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy). +var Allocator_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "schema.Allocator", + HandlerType: (*AllocatorServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "MakeAllocation", + Handler: _Allocator_MakeAllocation_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "allocator.proto", +} diff --git a/test/allocator/schema/generate.sh b/test/allocator/schema/generate.sh new file mode 100755 index 0000000..ff6dab7 --- /dev/null +++ b/test/allocator/schema/generate.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +set -e +set -x + +protoc -I/usr/include \ + -I. \ + --go_out=. \ + --go_opt=paths=source_relative \ + --go-grpc_out=. \ + --go-grpc_opt=paths=source_relative \ + allocator.proto diff --git a/test/allocator/server/config.go b/test/allocator/server/config.go new file mode 100644 index 0000000..a91fb08 --- /dev/null +++ b/test/allocator/server/config.go @@ -0,0 +1,26 @@ +package server + +import ( + "github.com/newcloudtechnologies/memlimiter" + "github.com/pkg/errors" +) + +// Config - верхнеуровневая конфигурация сервиса Allocator. +type Config struct { + MemLimiter *memlimiter.Config `json:"memlimiter"` //nolint:tagliatelle + Server *ServerConfig `json:"server"` +} + +// ServerConfig - конфигурация GRPC сервера. +type ServerConfig struct { + ListenEndpoint string `json:"listen_endpoint"` +} + +// Prepare - валидатор конфига. +func (c *Config) Prepare() error { + if c.Server == nil { + return errors.New("server is empty") + } + + return nil +} diff --git a/test/allocator/server/config.json b/test/allocator/server/config.json new file mode 100644 index 0000000..6b2290f --- /dev/null +++ b/test/allocator/server/config.json @@ -0,0 +1,17 @@ +{ + "memlimiter": { + "controller_nextgc": { + "rss_limit": "1G", + "danger_zone_gogc": 50, + "danger_zone_throttling": 90, + "period": "5s", + "component_proportional": { + "coefficient": 20, + "window_size": 20 + } + } + }, + "server": { + "listen_endpoint": "0.0.0.0:1988" + } +} \ No newline at end of file diff --git a/test/allocator/server/doc.go b/test/allocator/server/doc.go new file mode 100644 index 0000000..149cce3 --- /dev/null +++ b/test/allocator/server/doc.go @@ -0,0 +1,2 @@ +// Package server - тестовый сервис, выполняющий аллокации +package server diff --git a/test/allocator/server/server.go b/test/allocator/server/server.go new file mode 100644 index 0000000..87889c9 --- /dev/null +++ b/test/allocator/server/server.go @@ -0,0 +1,116 @@ +package server + +import ( + "context" + "math/rand" + "net" + "time" + + "github.com/go-logr/logr" + "github.com/newcloudtechnologies/memlimiter" + "github.com/newcloudtechnologies/memlimiter/utils" + "github.com/newcloudtechnologies/memlimiter/utils/config/prepare" + "google.golang.org/grpc" + + "github.com/newcloudtechnologies/memlimiter/test/allocator/schema" + "github.com/pkg/errors" +) + +// Server - интерфейс сервера. +type Server interface { + schema.AllocatorServer + // Run запускает в работу сервис. Блокирующий вызов. + Run() error + // Quit корректное завершение работы сервера. + Quit() +} + +var _ Server = (*serverImpl)(nil) + +type serverImpl struct { + schema.UnimplementedAllocatorServer + cfg *Config + logger logr.Logger + grpcServer *grpc.Server +} + +func (srv *serverImpl) MakeAllocation(_ context.Context, request *schema.MakeAllocationRequest) (*schema.MakeAllocationResponse, error) { + var array []byte + + // аллоцируем массив + if request.Size != 0 { + array = make([]byte, int(request.Size)) + //nolint:gosec + if _, err := rand.Read(array); err != nil { + return nil, errors.Wrap(err, "rand read") + } + } + + // ждём определённое время, чтобы он побыл в оперативной памяти - это имитация бизнес-логики + duration := request.Duration.AsDuration() + if duration != 0 { + time.Sleep(duration) + } + + // какая-то имитация работы, чтоб компилятор не оптимизировал массив + x := uint64(0) + for i := 0; i < len(array); i++ { + x += uint64(array[i]) + } + + return &schema.MakeAllocationResponse{Value: x}, nil +} + +func (srv *serverImpl) Run() error { + endpoint := srv.cfg.Server.ListenEndpoint + + listener, err := net.Listen("tcp", endpoint) + if err != nil { + return errors.Wrap(err, "net listen") + } + + srv.logger.Info("starting listening", "endpoint", endpoint) + + if err = srv.grpcServer.Serve(listener); err != nil { + return errors.Wrap(err, "grpc server serve") + } + + return nil +} + +func (srv *serverImpl) Quit() { + srv.logger.Info("terminating server") + srv.grpcServer.Stop() +} + +// NewAllocatorServer - конструктор сервера. +func NewAllocatorServer(logger logr.Logger, cfg *Config) (Server, error) { + if err := prepare.Prepare(cfg); err != nil { + return nil, errors.Wrap(err, "configs prepare") + } + + srv := &serverImpl{logger: logger, cfg: cfg} + + ml, err := memlimiter.NewServiceFromConfig( + logger, + cfg.MemLimiter, + utils.NewUngracefulApplicationTerminator(logger), + nil, + nil, + ) + + if err != nil { + return nil, errors.Wrap(err, "new memlimiter from config") + } + + options := []grpc.ServerOption{ + grpc.UnaryInterceptor(ml.MakeUnaryServerInterceptor()), + grpc.StreamInterceptor(ml.MakeStreamServerInterceptor()), + } + + srv.grpcServer = grpc.NewServer(options...) + + schema.RegisterAllocatorServer(srv.grpcServer, srv) + + return srv, nil +} diff --git a/test/infra/allocator/Dockerfile b/test/infra/allocator/Dockerfile new file mode 100644 index 0000000..3b75200 --- /dev/null +++ b/test/infra/allocator/Dockerfile @@ -0,0 +1,6 @@ +# TODO: либо менять базовый образ при смене ОС, либо наследоваться от образа с компилятором и собирать исходники прямо там +FROM fedora:35 + +ADD ucs-allocator /usr/local/bin + +CMD /usr/local/bin/ucs-allocator server -c /etc/ucs/allocator/config.json \ No newline at end of file diff --git a/test/infra/docker-compose.yml b/test/infra/docker-compose.yml new file mode 100644 index 0000000..b4eaa00 --- /dev/null +++ b/test/infra/docker-compose.yml @@ -0,0 +1,40 @@ +version: "3.3" +services: + hydra: + network_mode: host + volumes: + - ${PWD}/hydra:/etc/ucs/hydra + image: hub.stageoffice.ru/hydra + devkalion: + network_mode: host + volumes: + - ${PWD}/devkalion:/etc/ucs/devkalion + image: hub.stageoffice.ru/devkalion + gesiona: + network_mode: host + volumes: + - ${PWD}/gesiona:/etc/ucs/gesiona + - ${PWD}/targets:/var/lib/targets + image: hub.stageoffice.ru/gesiona + prometheus: + network_mode: host + volumes: + - ${PWD}/prometheus:/etc/prometheus + - ${PWD}/targets:/var/lib/targets + image: prom/prometheus + grafana: + network_mode: host + volumes: + - ${PWD}/grafana/provisioning/dashboards:/etc/grafana/provisioning/dashboards + - ${PWD}/grafana/provisioning/datasources:/etc/grafana/provisioning/datasources + - ${GOPATH}/src/gitlab.stageoffice.ru/mdc-devops/ansible_collections/nct.monitoring/roles/grafana/files:/var/lib/grafana/dashboards + image: grafana/grafana:7.5.10 + allocator: + build: ${PWD}/allocator + network_mode: host + deploy: + resources: + limits: + memory: 1G + volumes: + - ${PWD}/allocator:/etc/ucs/allocator diff --git a/test/infra/prepare.sh b/test/infra/prepare.sh new file mode 100755 index 0000000..3707997 --- /dev/null +++ b/test/infra/prepare.sh @@ -0,0 +1,35 @@ +#!/usr/bin/bash +set -e +set -x + +PROJECT_DIR="${GOPATH}/src/gitlab.stageoffice.ru/UCS-COMMON/memlimiter" + +# 1. подготовка контейнера с сервисом аллокатор +ALLOCATOR_SRC_DIR="${PROJECT_DIR}/test/allocator" +ALLOCATOR_DST_DIR="${PROJECT_DIR}/test/infra/allocator" +pushd "${ALLOCATOR_SRC_DIR}" +go build -o ucs-allocator +cp ucs-allocator "${ALLOCATOR_DST_DIR}" +cp ./server/config.json "${ALLOCATOR_DST_DIR}" +popd + +# 2. подготовка инфраструктуры Grafana + +# не у каждого человека есть репозиторий с Grafana +GRAFANA_SRC_DIR="${GOPATH}/src/gitlab.stageoffice.ru/mdc-devops/ansible_collections/nct.monitoring" + +if [ -d "${GRAFANA_SRC_DIR}" ] +then + pushd "${GRAFANA_SRC_DIR}" +# git checkout master +# git pull origin master +else + mkdir -p "${GRAFANA_SRC_DIR}" + pushd "${GRAFANA_SRC_DIR}" + git init + git remote add origin git@gitlab.stageoffice.ru:mdc-devops/ansible_collections/nct.monitoring.git + git fetch + git checkout origin/master -ft +fi + +popd \ No newline at end of file From f510dd32ba55f5c3913d12aa2a6b66a0f221f4ea Mon Sep 17 00:00:00 2001 From: Vitaly Isaev Date: Thu, 2 Jun 2022 17:32:33 +0300 Subject: [PATCH 2/4] CFS-2672 | feat: add default Subscription implementation --- backpressure/interface.go | 2 +- backpressure/mock.go | 2 +- backpressure/operator.go | 4 +- backpressure/throttler.go | 4 +- controller/interface.go | 2 +- controller/nextgc/controller.go | 22 ++++----- controller/nextgc/controller_test.go | 8 ++-- interface.go | 2 +- memlimiter.go | 4 +- stats/memlimiter.go | 26 +++++------ stats/mock.go | 4 +- stats/service.go | 8 +--- stats/subscription.go | 67 ++++++++++++++++++++++++++++ test/allocator/server/server.go | 3 +- 14 files changed, 110 insertions(+), 48 deletions(-) create mode 100644 stats/subscription.go diff --git a/backpressure/interface.go b/backpressure/interface.go index a6dfe10..c83a86e 100644 --- a/backpressure/interface.go +++ b/backpressure/interface.go @@ -20,5 +20,5 @@ type Operator interface { // AllowRequest используется интерсепторами запросов для подавления части запросов во время пиковых нагрузок. AllowRequest() bool // GetStats возвращает статистику подсистемы backpressure - GetStats() *stats.Backpressure + GetStats() *stats.BackpressureStats } diff --git a/backpressure/mock.go b/backpressure/mock.go index 4324d09..7d868c9 100644 --- a/backpressure/mock.go +++ b/backpressure/mock.go @@ -11,7 +11,7 @@ type OperatorMock struct { mock.Mock } -func (m *OperatorMock) GetStats() *stats.Backpressure { +func (m *OperatorMock) GetStats() *stats.BackpressureStats { // TODO implement me panic("implement me") } diff --git a/backpressure/operator.go b/backpressure/operator.go index e9a150e..800d2ad 100644 --- a/backpressure/operator.go +++ b/backpressure/operator.go @@ -17,8 +17,8 @@ type operatorImpl struct { logger logr.Logger } -func (b *operatorImpl) GetStats() *stats.Backpressure { - result := &stats.Backpressure{ +func (b *operatorImpl) GetStats() *stats.BackpressureStats { + result := &stats.BackpressureStats{ Throttling: b.throttler.getStats(), } diff --git a/backpressure/throttler.go b/backpressure/throttler.go index 3dd9acf..db5f61b 100644 --- a/backpressure/throttler.go +++ b/backpressure/throttler.go @@ -45,8 +45,8 @@ func (t *throttler) setThreshold(value uint32) error { return nil } -func (t *throttler) getStats() *stats.Throttling { - return &stats.Throttling{ +func (t *throttler) getStats() *stats.ThrottlingStats { + return &stats.ThrottlingStats{ Total: uint64(t.requestsTotal.Count()), Passed: uint64(t.requestsPassed.Count()), Throttled: uint64(t.requestsThrottled.Count()), diff --git a/controller/interface.go b/controller/interface.go index f89c0a2..18e787b 100644 --- a/controller/interface.go +++ b/controller/interface.go @@ -6,6 +6,6 @@ import ( // Controller - обобщённый интерфейс регулятора. type Controller interface { - GetStats() (*stats.Controller, error) + GetStats() (*stats.ControllerStats, error) Quit() } diff --git a/controller/nextgc/controller.go b/controller/nextgc/controller.go index 58cbd7c..ed9a319 100644 --- a/controller/nextgc/controller.go +++ b/controller/nextgc/controller.go @@ -50,15 +50,15 @@ type controllerImpl struct { } type getStatsRequest struct { - result chan *stats.Controller + result chan *stats.ControllerStats } -func (r *getStatsRequest) respondWith(resp *stats.Controller) { +func (r *getStatsRequest) respondWith(resp *stats.ControllerStats) { r.result <- resp } -func (c *controllerImpl) GetStats() (*stats.Controller, error) { - req := &getStatsRequest{result: make(chan *stats.Controller, 1)} +func (c *controllerImpl) GetStats() (*stats.ControllerStats, error) { + req := &getStatsRequest{result: make(chan *stats.ControllerStats, 1)} select { case c.getStatsChan <- req: @@ -112,7 +112,7 @@ func (c *controllerImpl) loop() { } } -func (c *controllerImpl) updateState(serviceStats *stats.Service) error { +func (c *controllerImpl) updateState(serviceStats *stats.ServiceStats) error { // извлекаем оперативную информацию о спец. потребителях памяти, если она предоставляется клиентом if c.consumptionReporter != nil { var err error @@ -134,7 +134,7 @@ func (c *controllerImpl) updateState(serviceStats *stats.Service) error { return nil } -func (c *controllerImpl) updateUtilization(serviceStats *stats.Service) { +func (c *controllerImpl) updateUtilization(serviceStats *stats.ServiceStats) { // Чтобы понять, сколько памяти можно аллоцировать на нужды Go, // требуется вычесть из общего лимита на RSS память, в явном виде потраченную в CGO. // Иными словами, если аллокации в CGO будут расти, то аллокации в Go должны ужиматься. @@ -224,21 +224,21 @@ func (c *controllerImpl) applyControlValue() error { return nil } -func (c *controllerImpl) aggregateStats() *stats.Controller { - res := &stats.Controller{ - MemoryBudget: &stats.MemoryBudget{ +func (c *controllerImpl) aggregateStats() *stats.ControllerStats { + res := &stats.ControllerStats{ + MemoryBudget: &stats.MemoryBudgetStats{ RSSLimit: c.cfg.RSSLimit.Value, GoAllocLimit: c.goAllocLimit, Utilization: c.utilization, }, - NextGC: &stats.ControllerNextGC{ + NextGC: &stats.ControllerNextGCStats{ P: c.pValue, Output: c.sumValue, }, } if c.consumptionReport != nil { - res.MemoryBudget.SpecialConsumers = &stats.SpecialConsumers{} + res.MemoryBudget.SpecialConsumers = &stats.SpecialConsumersStats{} res.MemoryBudget.SpecialConsumers.Go = c.consumptionReport.Go res.MemoryBudget.SpecialConsumers.Cgo = c.consumptionReport.Cgo } diff --git a/controller/nextgc/controller_test.go b/controller/nextgc/controller_test.go index 22c8fcb..a12104f 100644 --- a/controller/nextgc/controller_test.go +++ b/controller/nextgc/controller_test.go @@ -38,17 +38,17 @@ func TestController(t *testing.T) { } // Первый вариант статистики описывает ситуацию, когда память близка к исчерпанию - memoryBudgetExhausted := &stats.Service{ + memoryBudgetExhausted := &stats.ServiceStats{ NextGC: 950 * bytefmt.MEGABYTE, // память потрачена на 95% } // Во втором варианте бюджет памяти возвращается в норму - memoryBudgetNormal := &stats.Service{ + memoryBudgetNormal := &stats.ServiceStats{ NextGC: 300 * bytefmt.MEGABYTE, // память потрачена на 50% } subscriptionMock := &stats.ServiceSubscriptionMock{ - Chan: make(chan *stats.Service), + Chan: make(chan *stats.ServiceStats), } // канал закрывается, когда backpressure.Operator получит все необходимые команды @@ -63,7 +63,7 @@ func TestController(t *testing.T) { for { select { case <-ticker.C: - serviceStats, ok := serviceStatsContainer.Load().(*stats.Service) + serviceStats, ok := serviceStatsContainer.Load().(*stats.ServiceStats) if ok { subscriptionMock.Chan <- serviceStats } diff --git a/interface.go b/interface.go index 3b18358..ba0fbbc 100644 --- a/interface.go +++ b/interface.go @@ -7,7 +7,7 @@ import ( // Service - верхнеуровневый интерфейс системы управления бюджетом оперативной памяти. type Service interface { - GetStats() (*stats.Memlimiter, error) + GetStats() (*stats.MemlimiterStats, error) // MakeUnaryServerInterceptor возвращает интерсептор для унарных запросов MakeUnaryServerInterceptor() grpc.UnaryServerInterceptor // MakeStreamServerInterceptor возвращает интерсептор для стримовых запросов diff --git a/memlimiter.go b/memlimiter.go index 362c750..2b2def7 100644 --- a/memlimiter.go +++ b/memlimiter.go @@ -26,7 +26,7 @@ type serviceImpl struct { logger logr.Logger } -func (s *serviceImpl) GetStats() (*stats.Memlimiter, error) { +func (s *serviceImpl) GetStats() (*stats.MemlimiterStats, error) { controllerStats, err := s.controller.GetStats() if err != nil { return nil, errors.Wrap(err, "controller get stats") @@ -34,7 +34,7 @@ func (s *serviceImpl) GetStats() (*stats.Memlimiter, error) { backpressureStats := s.backpressureOperator.GetStats() - return &stats.Memlimiter{ + return &stats.MemlimiterStats{ Controller: controllerStats, Backpressure: backpressureStats, }, nil diff --git a/stats/memlimiter.go b/stats/memlimiter.go index 925a015..876f328 100644 --- a/stats/memlimiter.go +++ b/stats/memlimiter.go @@ -4,39 +4,39 @@ import ( "fmt" ) -type Memlimiter struct { - Controller *Controller - Backpressure *Backpressure +type MemlimiterStats struct { + Controller *ControllerStats + Backpressure *BackpressureStats } -type Controller struct { - MemoryBudget *MemoryBudget - NextGC *ControllerNextGC +type ControllerStats struct { + MemoryBudget *MemoryBudgetStats + NextGC *ControllerNextGCStats } -type MemoryBudget struct { +type MemoryBudgetStats struct { RSSLimit uint64 GoAllocLimit uint64 Utilization float64 - SpecialConsumers *SpecialConsumers + SpecialConsumers *SpecialConsumersStats } -type SpecialConsumers struct { +type SpecialConsumersStats struct { Go map[string]uint64 Cgo map[string]uint64 } -type ControllerNextGC struct { +type ControllerNextGCStats struct { P float64 Output float64 } -type Backpressure struct { - Throttling *Throttling +type BackpressureStats struct { + Throttling *ThrottlingStats ControlParameters *ControlParameters } -type Throttling struct { +type ThrottlingStats struct { Total uint64 Passed uint64 Throttled uint64 diff --git a/stats/mock.go b/stats/mock.go index 026d79a..98350d5 100644 --- a/stats/mock.go +++ b/stats/mock.go @@ -7,11 +7,11 @@ import ( var _ Subscription = (*ServiceSubscriptionMock)(nil) type ServiceSubscriptionMock struct { - Chan chan *Service + Chan chan *ServiceStats mock.Mock } -func (m *ServiceSubscriptionMock) Updates() <-chan *Service { +func (m *ServiceSubscriptionMock) Updates() <-chan *ServiceStats { return m.Chan } diff --git a/stats/service.go b/stats/service.go index 327a470..e8333db 100644 --- a/stats/service.go +++ b/stats/service.go @@ -1,12 +1,6 @@ package stats -type Service struct { +type ServiceStats struct { NextGC uint64 Custom interface{} } - -// Subscription - интерфейс подписки на оперативную статистику -type Subscription interface { - Updates() <-chan *Service - Quit() -} diff --git a/stats/subscription.go b/stats/subscription.go new file mode 100644 index 0000000..7081fc4 --- /dev/null +++ b/stats/subscription.go @@ -0,0 +1,67 @@ +package stats + +import ( + "runtime" + "time" + + "github.com/newcloudtechnologies/memlimiter/utils/breaker" +) + +// Subscription - интерфейс подписки на оперативную статистику +type Subscription interface { + Updates() <-chan *ServiceStats + Quit() +} + +type subscriptionDefault struct { + outChan chan *ServiceStats + period time.Duration + breaker *breaker.Breaker +} + +func (s *subscriptionDefault) Updates() <-chan *ServiceStats { return s.outChan } + +func (s *subscriptionDefault) Quit() { + s.breaker.ShutdownAndWait() +} + +func (s *subscriptionDefault) makeServiceStats() *ServiceStats { + ms := &runtime.MemStats{} + runtime.ReadMemStats(ms) + + return &ServiceStats{ + NextGC: ms.NextGC, + // don't forget to put real stats of your service in your own implementation + Custom: nil, + } +} + +func NewSubscriptionDefault(period time.Duration) Subscription { + ss := &subscriptionDefault{ + outChan: make(chan *ServiceStats), + period: period, + breaker: breaker.NewBreakerWithInitValue(1), + } + + go func() { + ticker := time.NewTicker(period) + defer ticker.Stop() + + defer ss.breaker.Dec() + + for { + select { + case <-ticker.C: + select { + case ss.outChan <- ss.makeServiceStats(): + case <-ss.breaker.Done(): + return + } + case <-ss.breaker.Done(): + return + } + } + }() + + return ss +} diff --git a/test/allocator/server/server.go b/test/allocator/server/server.go index 87889c9..0b88c69 100644 --- a/test/allocator/server/server.go +++ b/test/allocator/server/server.go @@ -8,6 +8,7 @@ import ( "github.com/go-logr/logr" "github.com/newcloudtechnologies/memlimiter" + "github.com/newcloudtechnologies/memlimiter/stats" "github.com/newcloudtechnologies/memlimiter/utils" "github.com/newcloudtechnologies/memlimiter/utils/config/prepare" "google.golang.org/grpc" @@ -95,7 +96,7 @@ func NewAllocatorServer(logger logr.Logger, cfg *Config) (Server, error) { logger, cfg.MemLimiter, utils.NewUngracefulApplicationTerminator(logger), - nil, + stats.NewSubscriptionDefault(time.Second), nil, ) From 2a82e8f2ae304c7dc35d0f121582251e2fd143a1 Mon Sep 17 00:00:00 2001 From: Vitaly Isaev Date: Thu, 2 Jun 2022 17:39:05 +0300 Subject: [PATCH 3/4] CFS-2672 | fix: panic in allocator perf client --- test/allocator/main.go | 10 +++++----- test/allocator/perf/client.go | 12 ++---------- 2 files changed, 7 insertions(+), 15 deletions(-) diff --git a/test/allocator/main.go b/test/allocator/main.go index c618bb3..e9a4b77 100644 --- a/test/allocator/main.go +++ b/test/allocator/main.go @@ -52,7 +52,7 @@ func main() { Required: true, }, }, - Action: actionPerf, + Action: func(context *cli.Context) error { return actionPerf(logger, context) }, }, }, } @@ -98,8 +98,8 @@ func makeServer(logger logr.Logger, c *cli.Context) (server.Server, error) { return srv, nil } -func actionPerf(c *cli.Context) error { - perfClient, err := makePerf(c) +func actionPerf(logger logr.Logger, c *cli.Context) error { + perfClient, err := makePerf(logger, c) if err != nil { return errors.Wrap(err, "make perf") } @@ -111,7 +111,7 @@ func actionPerf(c *cli.Context) error { return nil } -func makePerf(c *cli.Context) (*perf.Client, error) { +func makePerf(logger logr.Logger, c *cli.Context) (*perf.Client, error) { filename := c.String("config") data, err := ioutil.ReadFile(filepath.Clean(filename)) @@ -125,7 +125,7 @@ func makePerf(c *cli.Context) (*perf.Client, error) { return nil, errors.Wrap(err, "unmarshal") } - srv, err := perf.NewClient(cfg) + srv, err := perf.NewClient(logger, cfg) if err != nil { return nil, errors.Wrap(err, "new allocator server") } diff --git a/test/allocator/perf/client.go b/test/allocator/perf/client.go index 8bd0d33..452e9f5 100644 --- a/test/allocator/perf/client.go +++ b/test/allocator/perf/client.go @@ -110,19 +110,11 @@ func (p *Client) Quit() { } // NewClient создаёт нагрузочный клиент. -func NewClient(cfg *Config) (*Client, error) { +func NewClient(logger logr.Logger, cfg *Config) (*Client, error) { if err := prepare.Prepare(cfg); err != nil { return nil, errors.Wrap(err, "configs prepare") } - // FIXME: - /* - logger, err := gaben.FromConfig(cfg.Logging) - if err != nil { - return nil, errors.Wrap(err, "gaben from config") - } - */ - grpcConn, err := grpc.Dial(cfg.Endpoint, grpc.WithInsecure()) if err != nil { return nil, errors.Wrap(err, "dial error") @@ -132,7 +124,7 @@ func NewClient(cfg *Config) (*Client, error) { return &Client{ grpcConn: grpcConn, - logger: logr.Logger{}, // FIXME + logger: logger, client: client, startTime: time.Now(), cfg: cfg, From 08fe620b8c079a6615da08f700761fa4fdbed655 Mon Sep 17 00:00:00 2001 From: Vitaly Isaev Date: Fri, 3 Jun 2022 15:43:30 +0300 Subject: [PATCH 4/4] CFS-2672 | feat: tests for external usage --- test/allocator/app/app.go | 95 +++++++++++++++++++++ test/allocator/app/factory.go | 75 +++++++++++++++++ test/allocator/main.go | 144 +------------------------------- test/allocator/server/server.go | 24 ++++-- 4 files changed, 191 insertions(+), 147 deletions(-) create mode 100644 test/allocator/app/app.go create mode 100644 test/allocator/app/factory.go diff --git a/test/allocator/app/app.go b/test/allocator/app/app.go new file mode 100644 index 0000000..5601da0 --- /dev/null +++ b/test/allocator/app/app.go @@ -0,0 +1,95 @@ +package app + +import ( + "os" + "os/signal" + "syscall" + + "github.com/go-logr/logr" + "github.com/pkg/errors" + "github.com/urfave/cli/v2" +) + +type App struct { + logger logr.Logger + factory Factory +} + +func (a *App) Run() { + app := &cli.App{ + Name: "allocator", + Usage: "test application for memlimiter", + Commands: cli.Commands{ + &cli.Command{ + Name: "server", + Usage: "allocator server app", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "config", + Usage: "configuration file", + Aliases: []string{"c"}, + Required: true, + }, + }, + Action: func(context *cli.Context) error { + r, err := a.factory.MakeServer(context) + if err != nil { + return errors.Wrap(err, "make server") + } + + return a.runAndWaitSignal(r) + }, + }, + &cli.Command{ + Name: "perf", + Usage: "allocator perf client", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "config", + Usage: "configuration file", + Aliases: []string{"c"}, + Required: true, + }, + }, + Action: func(context *cli.Context) error { + r, err := a.factory.MakePerfClient(context) + if err != nil { + return errors.Wrap(err, "make perf client") + } + + return a.runAndWaitSignal(r) + }, + }, + }, + } + + if err := app.Run(os.Args); err != nil { + a.logger.Error(err, "application run") + os.Exit(1) + } +} + +func (a *App) runAndWaitSignal(r Runnable) error { + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) + + errChan := make(chan error, 1) + + go func() { errChan <- r.Run() }() + + defer r.Quit() + + select { + case err := <-errChan: + return errors.Wrap(err, "run error") + case <-signalChan: + return nil + } +} + +func NewApp(logger logr.Logger, factory Factory) *App { + return &App{ + logger: logger, + factory: factory, + } +} diff --git a/test/allocator/app/factory.go b/test/allocator/app/factory.go new file mode 100644 index 0000000..fb5c804 --- /dev/null +++ b/test/allocator/app/factory.go @@ -0,0 +1,75 @@ +package app + +import ( + "encoding/json" + "io/ioutil" + "path/filepath" + + "github.com/go-logr/logr" + "github.com/newcloudtechnologies/memlimiter/test/allocator/perf" + "github.com/newcloudtechnologies/memlimiter/test/allocator/server" + "github.com/pkg/errors" + "github.com/urfave/cli/v2" +) + +type Runnable interface { + Run() error + Quit() +} + +type Factory interface { + MakeServer(c *cli.Context) (Runnable, error) + MakePerfClient(c *cli.Context) (Runnable, error) +} + +type factoryDefault struct { + logger logr.Logger +} + +func (f *factoryDefault) MakeServer(c *cli.Context) (Runnable, error) { + filename := c.String("config") + + data, err := ioutil.ReadFile(filepath.Clean(filename)) + if err != nil { + return nil, errors.Wrap(err, "ioutil readfile") + } + + cfg := &server.Config{} + + if err = json.Unmarshal(data, cfg); err != nil { + return nil, errors.Wrap(err, "unmarshal") + } + + srv, err := server.NewAllocatorServer(f.logger, cfg) + if err != nil { + return nil, errors.Wrap(err, "new allocator server") + } + + return srv, nil +} + +func (f *factoryDefault) MakePerfClient(c *cli.Context) (Runnable, error) { + filename := c.String("config") + + data, err := ioutil.ReadFile(filepath.Clean(filename)) + if err != nil { + return nil, errors.Wrap(err, "ioutil readfile") + } + + cfg := &perf.Config{} + + if err = json.Unmarshal(data, cfg); err != nil { + return nil, errors.Wrap(err, "unmarshal") + } + + cl, err := perf.NewClient(f.logger, cfg) + if err != nil { + return nil, errors.Wrap(err, "new allocator server") + } + + return cl, nil +} + +func NewFactory(logger logr.Logger) Factory { + return &factoryDefault{logger: logger} +} diff --git a/test/allocator/main.go b/test/allocator/main.go index e9a4b77..ec3b96b 100644 --- a/test/allocator/main.go +++ b/test/allocator/main.go @@ -1,21 +1,11 @@ package main import ( - "encoding/json" - "io/ioutil" "log" "os" - "os/signal" - "path/filepath" - "syscall" - "github.com/go-logr/logr" "github.com/go-logr/stdr" - "github.com/pkg/errors" - "github.com/urfave/cli/v2" - - "github.com/newcloudtechnologies/memlimiter/test/allocator/perf" - "github.com/newcloudtechnologies/memlimiter/test/allocator/server" + "github.com/newcloudtechnologies/memlimiter/test/allocator/app" ) func main() { @@ -24,134 +14,6 @@ func main() { stdr.Options{LogCaller: stdr.All}, ) - app := &cli.App{ - Name: "allocator", - Usage: "test application for memlimiter", - Commands: cli.Commands{ - &cli.Command{ - Name: "server", - Usage: "allocator server app", - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "config", - Usage: "configuration file", - Aliases: []string{"c"}, - Required: true, - }, - }, - Action: func(context *cli.Context) error { return actionServer(logger, context) }, - }, - &cli.Command{ - Name: "perf", - Usage: "allocator perf client", - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "config", - Usage: "configuration file", - Aliases: []string{"c"}, - Required: true, - }, - }, - Action: func(context *cli.Context) error { return actionPerf(logger, context) }, - }, - }, - } - - if err := app.Run(os.Args); err != nil { - logger.Error(err, "application run") - os.Exit(1) - } -} - -func actionServer(logger logr.Logger, c *cli.Context) error { - srv, err := makeServer(logger, c) - if err != nil { - return errors.Wrap(err, "make server") - } - - if err := runAndWaitSignal(srv); err != nil { - return errors.Wrap(err, "run and wait signal") - } - - return nil -} - -func makeServer(logger logr.Logger, c *cli.Context) (server.Server, error) { - filename := c.String("config") - - data, err := ioutil.ReadFile(filepath.Clean(filename)) - if err != nil { - return nil, errors.Wrap(err, "ioutil readfile") - } - - cfg := &server.Config{} - - if err = json.Unmarshal(data, cfg); err != nil { - return nil, errors.Wrap(err, "unmarshal") - } - - srv, err := server.NewAllocatorServer(logger, cfg) - if err != nil { - return nil, errors.Wrap(err, "new allocator server") - } - - return srv, nil -} - -func actionPerf(logger logr.Logger, c *cli.Context) error { - perfClient, err := makePerf(logger, c) - if err != nil { - return errors.Wrap(err, "make perf") - } - - if err := runAndWaitSignal(perfClient); err != nil { - return errors.Wrap(err, "run and wait signal") - } - - return nil -} - -func makePerf(logger logr.Logger, c *cli.Context) (*perf.Client, error) { - filename := c.String("config") - - data, err := ioutil.ReadFile(filepath.Clean(filename)) - if err != nil { - return nil, errors.Wrap(err, "ioutil readfile") - } - - cfg := &perf.Config{} - - if err = json.Unmarshal(data, cfg); err != nil { - return nil, errors.Wrap(err, "unmarshal") - } - - srv, err := perf.NewClient(logger, cfg) - if err != nil { - return nil, errors.Wrap(err, "new allocator server") - } - - return srv, nil -} - -type runnable interface { - Run() error - Quit() -} - -func runAndWaitSignal(r runnable) error { - signalChan := make(chan os.Signal, 1) - signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) - - errChan := make(chan error, 1) - - go func() { errChan <- r.Run() }() - - defer r.Quit() - - select { - case err := <-errChan: - return errors.Wrap(err, "run error") - case <-signalChan: - return nil - } + a := app.NewApp(logger, app.NewFactory(logger)) + a.Run() } diff --git a/test/allocator/server/server.go b/test/allocator/server/server.go index 0b88c69..79c8845 100644 --- a/test/allocator/server/server.go +++ b/test/allocator/server/server.go @@ -24,6 +24,10 @@ type Server interface { Run() error // Quit корректное завершение работы сервера. Quit() + // GRPCServer returns underlying server implementation. Only for test purposes. + GRPCServer() *grpc.Server + // MemLimiter returns underlying memlimiter implementation. Only for test purposes. + MemLimiter() memlimiter.Service } var _ Server = (*serverImpl)(nil) @@ -33,6 +37,7 @@ type serverImpl struct { cfg *Config logger logr.Logger grpcServer *grpc.Server + ml memlimiter.Service } func (srv *serverImpl) MakeAllocation(_ context.Context, request *schema.MakeAllocationRequest) (*schema.MakeAllocationResponse, error) { @@ -79,19 +84,21 @@ func (srv *serverImpl) Run() error { return nil } +func (srv *serverImpl) GRPCServer() *grpc.Server { return srv.grpcServer } + +func (srv *serverImpl) MemLimiter() memlimiter.Service { return srv.ml } + func (srv *serverImpl) Quit() { srv.logger.Info("terminating server") srv.grpcServer.Stop() } // NewAllocatorServer - конструктор сервера. -func NewAllocatorServer(logger logr.Logger, cfg *Config) (Server, error) { +func NewAllocatorServer(logger logr.Logger, cfg *Config, options ...grpc.ServerOption) (Server, error) { if err := prepare.Prepare(cfg); err != nil { return nil, errors.Wrap(err, "configs prepare") } - srv := &serverImpl{logger: logger, cfg: cfg} - ml, err := memlimiter.NewServiceFromConfig( logger, cfg.MemLimiter, @@ -104,12 +111,17 @@ func NewAllocatorServer(logger logr.Logger, cfg *Config) (Server, error) { return nil, errors.Wrap(err, "new memlimiter from config") } - options := []grpc.ServerOption{ + options = append(options, grpc.UnaryInterceptor(ml.MakeUnaryServerInterceptor()), grpc.StreamInterceptor(ml.MakeStreamServerInterceptor()), - } + ) - srv.grpcServer = grpc.NewServer(options...) + srv := &serverImpl{ + logger: logger, + cfg: cfg, + ml: ml, + grpcServer: grpc.NewServer(options...), + } schema.RegisterAllocatorServer(srv.grpcServer, srv)