diff --git a/endpoints/openapi_v1/bfe_pool/create.go b/endpoints/openapi_v1/bfe_pool/create.go index b793ed8..6e965a2 100644 --- a/endpoints/openapi_v1/bfe_pool/create.go +++ b/endpoints/openapi_v1/bfe_pool/create.go @@ -41,7 +41,7 @@ var _ xreq.Handler = CreateAction // CreateAction action // AUTO GEN BY ctrl, MODIFY AS U NEED func CreateAction(req *http.Request) (interface{}, error) { - param, err := product_pool.NewUpsertParam(req) + param, err := product_pool.NewCreateParam(req) if err != nil { return nil, err } @@ -54,12 +54,19 @@ func CreateAction(req *http.Request) (interface{}, error) { } oneData, err := container.PoolManager.CreateBFEPool(req.Context(), &icluster_conf.PoolParam{ - Name: param.Name, + Name: param.Name, + Type: param.Type, + }, &icluster_conf.PoolInstances{ Instances: product_pool.Instancesc2i(param.Instances), }) if err != nil { return nil, err } - return product_pool.NewOneData(oneData), nil + pism, err := container.PoolInstancesManager.BatchFetchInstances(req.Context(), []*icluster_conf.Pool{oneData}) + if err != nil { + return nil, err + } + + return product_pool.NewOneData(oneData, pism[oneData.Name]), nil } diff --git a/endpoints/openapi_v1/bfe_pool/delete.go b/endpoints/openapi_v1/bfe_pool/delete.go index 2308147..c44441f 100644 --- a/endpoints/openapi_v1/bfe_pool/delete.go +++ b/endpoints/openapi_v1/bfe_pool/delete.go @@ -20,6 +20,7 @@ import ( "github.com/bfenetworks/api-server/endpoints/openapi_v1/product_pool" "github.com/bfenetworks/api-server/lib/xreq" "github.com/bfenetworks/api-server/model/iauth" + "github.com/bfenetworks/api-server/model/icluster_conf" "github.com/bfenetworks/api-server/stateful/container" ) @@ -46,5 +47,10 @@ func DeleteAction(req *http.Request) (interface{}, error) { return nil, err } - return product_pool.NewOneData(oldOne), nil + pism, err := container.PoolInstancesManager.BatchFetchInstances(req.Context(), []*icluster_conf.Pool{oldOne}) + if err != nil { + return nil, err + } + + return product_pool.NewOneData(oldOne, pism[oldOne.Name]), nil } diff --git a/endpoints/openapi_v1/bfe_pool/one.go b/endpoints/openapi_v1/bfe_pool/one.go index 23107d8..6fbdbc3 100644 --- a/endpoints/openapi_v1/bfe_pool/one.go +++ b/endpoints/openapi_v1/bfe_pool/one.go @@ -21,6 +21,7 @@ import ( "github.com/bfenetworks/api-server/lib/xerror" "github.com/bfenetworks/api-server/lib/xreq" "github.com/bfenetworks/api-server/model/iauth" + "github.com/bfenetworks/api-server/model/icluster_conf" "github.com/bfenetworks/api-server/stateful/container" ) @@ -51,6 +52,10 @@ func OneAction(req *http.Request) (interface{}, error) { return nil, xerror.WrapRecordNotExist("Instance Pool") } - return product_pool.NewOneData(one), nil + pism, err := container.PoolInstancesManager.BatchFetchInstances(req.Context(), []*icluster_conf.Pool{one}) + if err != nil { + return nil, err + } + return product_pool.NewOneData(one, pism[one.Name]), nil } diff --git a/endpoints/openapi_v1/bfe_pool/update.go b/endpoints/openapi_v1/bfe_pool/update.go index 8f2b156..a4cb118 100644 --- a/endpoints/openapi_v1/bfe_pool/update.go +++ b/endpoints/openapi_v1/bfe_pool/update.go @@ -39,7 +39,7 @@ var _ xreq.Handler = UpdateAction // UpdateAction action // AUTO GEN BY ctrl, MODIFY AS U NEED func UpdateAction(req *http.Request) (interface{}, error) { - param, err := product_pool.NewUpsertParam(req) + param, err := product_pool.NewUpdateParam(req) if err != nil { return nil, err } @@ -52,11 +52,14 @@ func UpdateAction(req *http.Request) (interface{}, error) { return nil, xerror.WrapRecordNotExist("Instance Pool") } - err = container.PoolManager.UpdateBFEPool(req.Context(), one, &icluster_conf.PoolParam{ + pi := &icluster_conf.PoolInstances{ + Name: one.Name, Instances: product_pool.Instancesc2i(param.Instances), - }) - - one.Instances = product_pool.Instancesc2i(param.Instances) + } + err = container.PoolInstancesManager.UpdateInstances(req.Context(), one, pi) + if err != nil { + return nil, err + } - return product_pool.NewOneData(one), err + return product_pool.NewOneData(one, pi), err } diff --git a/endpoints/openapi_v1/product_pool/create.go b/endpoints/openapi_v1/product_pool/create.go index d33f994..ddf9848 100644 --- a/endpoints/openapi_v1/product_pool/create.go +++ b/endpoints/openapi_v1/product_pool/create.go @@ -18,6 +18,7 @@ import ( "net/http" "strings" + "github.com/bfenetworks/api-server/lib" "github.com/bfenetworks/api-server/lib/xerror" "github.com/bfenetworks/api-server/lib/xreq" "github.com/bfenetworks/api-server/model/iauth" @@ -26,13 +27,6 @@ import ( "github.com/bfenetworks/api-server/stateful/container" ) -// UpsertParam Request Param -// AUTO GEN BY ctrl, MODIFY AS U NEED -type UpsertParam struct { - Name *string `json:"name" uri:"instance_pool_name" validate:"required,min=2"` - Instances []*Instance `json:"instances" uri:"instances" validate:"min=1,dive"` -} - // CreateRoute route // AUTO GEN BY ctrl, MODIFY AS U NEED var CreateEndpoint = &xreq.Endpoint{ @@ -42,9 +36,19 @@ var CreateEndpoint = &xreq.Endpoint{ Authorizer: iauth.FAP(iauth.FeatureProductPool, iauth.ActionCreate), } +// CreateParam Request Param +// AUTO GEN BY ctrl, MODIFY AS U NEED +type CreateParam struct { + Name *string `json:"name" validate:"required,min=2"` + Type *int8 `json:"type" validate:"oneof=1"` + Instances []*Instance `json:"instances" validate:"min=1,dive"` +} + // AUTO GEN BY ctrl, MODIFY AS U NEED -func NewUpsertParam(req *http.Request) (*UpsertParam, error) { - param := &UpsertParam{} +func NewCreateParam(req *http.Request) (*CreateParam, error) { + param := &CreateParam{ + Type: lib.PInt8(icluster_conf.PoolInstancesTypeRDB), + } err := xreq.Bind(req, param) if err != nil { return nil, err @@ -58,7 +62,7 @@ var _ xreq.Handler = CreateAction // CreateAction action // AUTO GEN BY ctrl, MODIFY AS U NEED func CreateAction(req *http.Request) (interface{}, error) { - param, err := NewUpsertParam(req) + param, err := NewCreateParam(req) if err != nil { return nil, err } @@ -80,7 +84,12 @@ func CreateAction(req *http.Request) (interface{}, error) { return nil, err } - return NewOneData(oneData), nil + pism, err := container.PoolInstancesManager.BatchFetchInstances(req.Context(), []*icluster_conf.Pool{oneData}) + if err != nil { + return nil, err + } + + return NewOneData(oneData, pism[oneData.Name]), nil } func Instancesc2i(is []*Instance) []icluster_conf.Instance { @@ -103,9 +112,11 @@ func Instancesc2i(is []*Instance) []icluster_conf.Instance { return rst } -func CreateProcess(req *http.Request, product *ibasic.Product, param *UpsertParam) (*icluster_conf.Pool, error) { +func CreateProcess(req *http.Request, product *ibasic.Product, param *CreateParam) (*icluster_conf.Pool, error) { return container.PoolManager.CreateProductPool(req.Context(), product, &icluster_conf.PoolParam{ - Name: param.Name, + Name: param.Name, + Type: param.Type, + }, &icluster_conf.PoolInstances{ Instances: Instancesc2i(param.Instances), }) } diff --git a/endpoints/openapi_v1/product_pool/delete.go b/endpoints/openapi_v1/product_pool/delete.go index 6396eb6..c52e832 100644 --- a/endpoints/openapi_v1/product_pool/delete.go +++ b/endpoints/openapi_v1/product_pool/delete.go @@ -20,6 +20,7 @@ import ( "github.com/bfenetworks/api-server/lib/xreq" "github.com/bfenetworks/api-server/model/iauth" "github.com/bfenetworks/api-server/model/ibasic" + "github.com/bfenetworks/api-server/model/icluster_conf" "github.com/bfenetworks/api-server/stateful/container" ) @@ -52,5 +53,10 @@ func DeleteAction(req *http.Request) (interface{}, error) { return nil, err } - return NewOneData(oldOne), nil + pism, err := container.PoolInstancesManager.BatchFetchInstances(req.Context(), []*icluster_conf.Pool{oldOne}) + if err != nil { + return nil, err + } + + return NewOneData(oldOne, pism[oldOne.Name]), nil } diff --git a/endpoints/openapi_v1/product_pool/one.go b/endpoints/openapi_v1/product_pool/one.go index bff725b..0a9c5c1 100644 --- a/endpoints/openapi_v1/product_pool/one.go +++ b/endpoints/openapi_v1/product_pool/one.go @@ -48,16 +48,18 @@ type OneData struct { Instances []*Instance `json:"instances" uri:"instances"` } -func NewOneData(pool *icluster_conf.Pool) *OneData { +func NewOneData(pool *icluster_conf.Pool, pis *icluster_conf.PoolInstances) *OneData { is := []*Instance{} - for _, one := range pool.Instances { - is = append(is, &Instance{ - Hostname: one.HostName, - IP: one.IP, - Weight: one.Weight, - Ports: one.Ports, - Tags: one.Tags, - }) + if pis != nil { + for _, one := range pis.Instances { + is = append(is, &Instance{ + Hostname: one.HostName, + IP: one.IP, + Weight: one.Weight, + Ports: one.Ports, + Tags: one.Tags, + }) + } } return &OneData{ @@ -105,5 +107,9 @@ func OneAction(req *http.Request) (interface{}, error) { return nil, xerror.WrapRecordNotExist("Instance Pool") } - return NewOneData(one), nil + pism, err := container.PoolInstancesManager.BatchFetchInstances(req.Context(), []*icluster_conf.Pool{one}) + if err != nil { + return nil, err + } + return NewOneData(one, pism[one.Name]), nil } diff --git a/endpoints/openapi_v1/product_pool/update.go b/endpoints/openapi_v1/product_pool/update.go index 1030122..8553bef 100644 --- a/endpoints/openapi_v1/product_pool/update.go +++ b/endpoints/openapi_v1/product_pool/update.go @@ -25,6 +25,24 @@ import ( "github.com/bfenetworks/api-server/stateful/container" ) +// UpdateParam Request Param +// AUTO GEN BY ctrl, MODIFY AS U NEED +type UpdateParam struct { + Name *string `uri:"instance_pool_name" validate:"required,min=2"` + Instances []*Instance `json:"instances" validate:"min=1,dive"` +} + +// AUTO GEN BY ctrl, MODIFY AS U NEED +func NewUpdateParam(req *http.Request) (*UpdateParam, error) { + param := &UpdateParam{} + err := xreq.Bind(req, param) + if err != nil { + return nil, err + } + + return param, err +} + // UpdateRoute route // AUTO GEN BY ctrl, MODIFY AS U NEED var UpdateEndpoint = &xreq.Endpoint{ @@ -39,7 +57,7 @@ var _ xreq.Handler = UpdateAction // UpdateAction action // AUTO GEN BY ctrl, MODIFY AS U NEED func UpdateAction(req *http.Request) (interface{}, error) { - param, err := NewUpsertParam(req) + param, err := NewCreateParam(req) if err != nil { return nil, err } @@ -56,11 +74,14 @@ func UpdateAction(req *http.Request) (interface{}, error) { return nil, xerror.WrapRecordNotExist("Instance Pool") } - err = container.PoolManager.UpdateProductPool(req.Context(), product, one, &icluster_conf.PoolParam{ + pi := &icluster_conf.PoolInstances{ + Name: one.Name, Instances: Instancesc2i(param.Instances), - }) - - one.Instances = Instancesc2i(param.Instances) + } + err = container.PoolInstancesManager.UpdateInstances(req.Context(), one, pi) + if err != nil { + return nil, err + } - return NewOneData(one), err + return NewOneData(one, pi), nil } diff --git a/model/icluster_conf/cluster.go b/model/icluster_conf/cluster.go index ac64793..1f1e9d8 100644 --- a/model/icluster_conf/cluster.go +++ b/model/icluster_conf/cluster.go @@ -236,6 +236,7 @@ func ClusterList2MapByID(list []*Cluster) map[int64]*Cluster { func NewClusterManager(txn itxn.TxnStorager, storager ClusterStorager, subClusterStorager SubClusterStorager, bfeClusterStorager ibasic.BFEClusterStorager, + poolInstancesManager *PoolInstancesManager, versionControlManager *iversion_control.VersionControlManager, deleteCheckers map[string]func(context.Context, *ibasic.Product, *Cluster) error) *ClusterManager { @@ -244,6 +245,7 @@ func NewClusterManager(txn itxn.TxnStorager, storager ClusterStorager, storager: storager, subClusterStorager: subClusterStorager, bfeClusterStorager: bfeClusterStorager, + poolInstancesManager: poolInstancesManager, versionControlManager: versionControlManager, deleteCheckers: deleteCheckers, @@ -266,6 +268,7 @@ type ClusterManager struct { subClusterStorager SubClusterStorager bfeClusterStorager ibasic.BFEClusterStorager + poolInstancesManager *PoolInstancesManager versionControlManager *iversion_control.VersionControlManager deleteCheckers map[string]func(context.Context, *ibasic.Product, *Cluster) error diff --git a/model/icluster_conf/exporter.go b/model/icluster_conf/exporter.go index 98bf92e..d813b47 100644 --- a/model/icluster_conf/exporter.go +++ b/model/icluster_conf/exporter.go @@ -45,17 +45,37 @@ func (rm *ClusterManager) clusterTableConfGenerator(ctx context.Context) (*ivers return nil, err } + pools := map[string]*Pool{} + for _, cluster := range clusters { + for _, subcluster := range cluster.SubClusters { + if subcluster.InstancePool != nil { + pools[subcluster.InstancePool.Name] = subcluster.InstancePool + } + } + } + + // maybe rpc in db transaction + piMap, err := rm.poolInstancesManager.BatchFetchInstances(ctx, PoolMap2List(pools)) + if err != nil { + return nil, err + } + allClusters := cluster_table_conf.AllClusterBackend{} for _, cluster := range clusters { clusterBackend := map[string]cluster_table_conf.SubClusterBackend{} for _, subCluster := range cluster.SubClusters { - if subCluster.InstancePool == nil || len(subCluster.InstancePool.Instances) == 0 { + if subCluster.InstancePool == nil { + continue + } + + pi := piMap[subCluster.InstancePool.Name] + if pi == nil || len(pi.Instances) == 0 { continue } - subClusterBackend := make(cluster_table_conf.SubClusterBackend, 0, len(subCluster.InstancePool.Instances)) - for _, instance := range subCluster.InstancePool.Instances { + subClusterBackend := make(cluster_table_conf.SubClusterBackend, 0, len(pi.Instances)) + for _, instance := range pi.Instances { subClusterBackend = append(subClusterBackend, &cluster_table_conf.BackendConf{ Name: lib.PString(instance.HostName), Addr: lib.PString(instance.IP), diff --git a/model/icluster_conf/pool.go b/model/icluster_conf/pool.go index c204768..56817f9 100644 --- a/model/icluster_conf/pool.go +++ b/model/icluster_conf/pool.go @@ -16,7 +16,6 @@ package icluster_conf import ( "context" - "fmt" "strings" "github.com/bfenetworks/api-server/lib/xerror" @@ -40,37 +39,30 @@ type PoolParam struct { ID *int64 Name *string ProductID *int64 - Instances []Instance - - Tag *int8 + Type *int8 + Tag *int8 } type Pool struct { - ID int64 - Name string - Type int8 - Ready bool - Product *ibasic.Product - Instances []Instance - Tag int8 + ID int64 + Name string + Type int8 + Ready bool + Tag int8 + Product *ibasic.Product + + instances []Instance } -type Instance struct { - HostName string `json:"Name"` - IP string `json:"Addr"` - Port int `json:"Port"` - Ports map[string]int `json:"Ports,omitempty"` - Tags map[string]string `json:"tags,omitempty"` - Weight int64 `json:"Weight"` - Disable bool `json:"Disable"` +func (p *Pool) SetDefaultInstances(is []Instance) { + p.instances = is } -func (i *Instance) IPWithPort() string { - if i.Port == 0 { - i.Port = i.Ports["Default"] +func (p *Pool) GetDefaultInstances() *PoolInstances { + return &PoolInstances{ + Name: p.Name, + Instances: p.instances, } - - return fmt.Sprintf("%s:%d", i.IP, i.Port) } type PoolStorager interface { @@ -87,16 +79,21 @@ type PoolManager struct { bfeClusterStorager ibasic.BFEClusterStorager subClusterStorager SubClusterStorager txn itxn.TxnStorager + + poolInstancesManager *PoolInstancesManager } func NewPoolManager(txn itxn.TxnStorager, storager PoolStorager, - bfeClusterStorager ibasic.BFEClusterStorager, subClusterStorager SubClusterStorager) *PoolManager { + bfeClusterStorager ibasic.BFEClusterStorager, subClusterStorager SubClusterStorager, + poolInstancesManager *PoolInstancesManager) *PoolManager { return &PoolManager{ txn: txn, storager: storager, bfeClusterStorager: bfeClusterStorager, subClusterStorager: subClusterStorager, + + poolInstancesManager: poolInstancesManager, } } @@ -123,7 +120,7 @@ func (rppm *PoolManager) FetchProductPool(ctx context.Context, product *ibasic.P one, err = rppm.storager.FetchPool(ctx, name) return err }) - + return } @@ -212,12 +209,12 @@ func (rppm *PoolManager) DeleteProductPool(ctx context.Context, product *ibasic. return } -func (rppm *PoolManager) CreateBFEPool(ctx context.Context, pool *PoolParam) (one *Pool, err error) { +func (rppm *PoolManager) CreateBFEPool(ctx context.Context, pool *PoolParam, pis *PoolInstances) (one *Pool, err error) { pool.Tag = &PoolTagBFE - return rppm.CreateProductPool(ctx, ibasic.BuildinProduct, pool) + return rppm.CreateProductPool(ctx, ibasic.BuildinProduct, pool, pis) } -func (rppm *PoolManager) CreateProductPool(ctx context.Context, product *ibasic.Product, pool *PoolParam) (one *Pool, err error) { +func (rppm *PoolManager) CreateProductPool(ctx context.Context, product *ibasic.Product, pool *PoolParam, pis *PoolInstances) (one *Pool, err error) { var pN string pN, err = poolNameJudger(product.Name, *pool.Name) if err != nil { @@ -238,6 +235,13 @@ func (rppm *PoolManager) CreateProductPool(ctx context.Context, product *ibasic. } one, err = rppm.storager.CreatePool(ctx, product, pool) + if err != nil { + return err + } + + if pis != nil { + err = rppm.poolInstancesManager.UpdateInstances(ctx, one, pis) + } return err }) diff --git a/model/icluster_conf/pool_instance.go b/model/icluster_conf/pool_instance.go new file mode 100644 index 0000000..326924a --- /dev/null +++ b/model/icluster_conf/pool_instance.go @@ -0,0 +1,112 @@ +// Copyright (c) 2021 The BFE Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package icluster_conf + +import ( + "context" + "fmt" + + "github.com/bfenetworks/api-server/lib/xerror" +) + +type Instance struct { + HostName string `json:"Name"` + IP string `json:"Addr"` + Port int `json:"Port"` + Ports map[string]int `json:"Ports,omitempty"` + Tags map[string]string `json:"tags,omitempty"` + Weight int64 `json:"Weight"` + Disable bool `json:"Disable"` +} + +func (i *Instance) IPWithPort() string { + if i.Port == 0 { + i.Port = i.Ports["Default"] + } + + return fmt.Sprintf("%s:%d", i.IP, i.Port) +} + +type PoolInstances struct { + Name string + Instances []Instance +} + +const ( + PoolInstancesTypeRDB int8 = 1 +) + +type PoolInstanceStorager interface { + UpdateInstances(context.Context, *Pool, *PoolInstances) error + + BatchFetchInstances(context.Context, []*Pool) (map[string]*PoolInstances, error) +} + +type PoolInstancesManager struct { + poolInstanceStorages map[int8]PoolInstanceStorager +} + +func NewPoolInstancesManager(poolInstanceStorages map[int8]PoolInstanceStorager) *PoolInstancesManager { + return &PoolInstancesManager{ + poolInstanceStorages: poolInstanceStorages, + } +} + +func (pim *PoolInstancesManager) BatchFetchInstances(ctx context.Context, pools []*Pool) (map[string]*PoolInstances, error) { + type2PoolInstancesList := map[int8][]*Pool{} + for _, one := range pools { + type2PoolInstancesList[one.Type] = append(type2PoolInstancesList[one.Type], one) + } + + for typ := range type2PoolInstancesList { + _, ok := pim.poolInstanceStorages[typ] + if !ok { + return nil, xerror.WrapModelErrorWithMsg("Type %d not register Storager", typ) + } + } + + rst := map[string]*PoolInstances{} + for typ, pisList := range type2PoolInstancesList { + storager := pim.poolInstanceStorages[typ] + r, err := storager.BatchFetchInstances(ctx, pisList) + if err != nil { + return nil, err + } + + for name, pis := range r { + rst[name] = pis + } + } + + return rst, nil +} + +func (pim *PoolInstancesManager) UpdateInstances(ctx context.Context, pool *Pool, pis *PoolInstances) error { + storager, ok := pim.poolInstanceStorages[pool.Type] + if !ok { + return xerror.WrapModelErrorWithMsg("Type %d not register Storager", pool.Type) + } + + return storager.UpdateInstances(ctx, pool, pis) +} + +func PoolMap2List(m map[string]*Pool) []*Pool { + var r []*Pool + for _, one := range m { + r = append(r, one) + } + + return r +} diff --git a/stateful/container/components.go b/stateful/container/components.go index 2b96061..d065157 100644 --- a/stateful/container/components.go +++ b/stateful/container/components.go @@ -51,4 +51,5 @@ var ( AuthenticateManager *iauth.AuthenticateManager AuthorizeManager *iauth.AuthorizeManager PoolManager *icluster_conf.PoolManager + PoolInstancesManager *icluster_conf.PoolInstancesManager ) diff --git a/stateful/container/rdb/components.go b/stateful/container/rdb/components.go index 85710be..ad83945 100644 --- a/stateful/container/rdb/components.go +++ b/stateful/container/rdb/components.go @@ -90,11 +90,16 @@ func Init(registerServier *register.RegisterServier) { container.VersionControlManager, container.DomainStoragerSingleton) + container.PoolInstancesManager = icluster_conf.NewPoolInstancesManager(map[int8]icluster_conf.PoolInstanceStorager{ + icluster_conf.PoolInstancesTypeRDB: cluster_conf.NewRDBPoolInstanceStorager(stateful.NewBFEDBContext), + }) + container.ClusterManager = icluster_conf.NewClusterManager( container.TxnStoragerSingleton, container.ClusterStoragerSingleton, container.SubClusterStoragerSingleton, container.BFEClusterStoragerSingleton, + container.PoolInstancesManager, container.VersionControlManager, map[string]func(context.Context, *ibasic.Product, *icluster_conf.Cluster) error{ "rules": container.RouteRuleManager.ClusterDeleteChecker, @@ -125,5 +130,6 @@ func Init(registerServier *register.RegisterServier) { container.TxnStoragerSingleton, container.PoolStoragerSingleton, container.BFEClusterStoragerSingleton, - container.SubClusterStoragerSingleton) + container.SubClusterStoragerSingleton, + container.PoolInstancesManager) } diff --git a/storage/rdb/cluster_conf/pool.go b/storage/rdb/cluster_conf/pool.go index 7b7c481..63d16bc 100644 --- a/storage/rdb/cluster_conf/pool.go +++ b/storage/rdb/cluster_conf/pool.go @@ -63,22 +63,12 @@ func poolParami2d(data *icluster_conf.PoolParam) (*dao.TPoolsParam, error) { return nil, nil } - var detail *string - if data.Instances != nil { - bs, err := json.Marshal(data.Instances) - if err != nil { - return nil, xerror.WrapParamErrorWithMsg("Instances Marshal, err: %s", err) - } - - detail = lib.PString(string(bs)) - } - return &dao.TPoolsParam{ - Id: data.ID, - Name: data.Name, - ProductID: data.ProductID, - InstanceDetail: detail, - Tag: data.Tag, + Id: data.ID, + Name: data.Name, + ProductID: data.ProductID, + Type: data.Type, + Tag: data.Tag, }, nil } @@ -96,16 +86,13 @@ func (rpps *RDBPoolStorager) CreatePool(ctx context.Context, product *ibasic.Pro return nil, err } - newID, err := dao.TPoolsCreate(dbCtx, param) + _, err = dao.TPoolsCreate(dbCtx, param) if err != nil { return nil, err } - return &icluster_conf.Pool{ - ID: newID, - Name: *data.Name, - Instances: data.Instances, - }, nil + return rpps.FetchPool(ctx, *param.Name) + } func (rpps *RDBPoolStorager) FetchPool(ctx context.Context, name string) (*icluster_conf.Pool, error) { @@ -133,12 +120,15 @@ func newPool(pp *dao.TPools, product *ibasic.Product) (*icluster_conf.Pool, erro Tag: pp.Tag, } + // get default instance list if pp.InstanceDetail == "" || pp.InstanceDetail == "NULL" { pp.InstanceDetail = "[]" } - if err := json.Unmarshal([]byte(pp.InstanceDetail), &data.Instances); err != nil { + is := []icluster_conf.Instance{} + if err := json.Unmarshal([]byte(pp.InstanceDetail), &is); err != nil { return nil, xerror.WrapDirtyDataErrorWithMsg("pool %s, raw: %s, err: %v", pp.Name, pp.InstanceDetail, err) } + data.SetDefaultInstances(is) return data, nil } @@ -180,7 +170,7 @@ func (rpps *RDBPoolStorager) FetchPools(ctx context.Context, filter *icluster_co } rst = append(rst, p) } - rpps.registerServier.GetRegisteredInstance(rst) + // rpps.registerServier.GetRegisteredInstance(rst) return rst, nil } diff --git a/storage/rdb/cluster_conf/pool_instance.go b/storage/rdb/cluster_conf/pool_instance.go new file mode 100644 index 0000000..0ed6af7 --- /dev/null +++ b/storage/rdb/cluster_conf/pool_instance.go @@ -0,0 +1,77 @@ +// Copyright (c) 2021 The BFE Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cluster_conf + +import ( + "context" + "encoding/json" + + "github.com/bfenetworks/api-server/lib" + "github.com/bfenetworks/api-server/lib/xerror" + "github.com/bfenetworks/api-server/model/icluster_conf" + "github.com/bfenetworks/api-server/storage/rdb/internal/dao" +) + +type RDBPoolInstanceStorager struct { + dbCtxFactory lib.DBContextFactory +} + +func NewRDBPoolInstanceStorager(dbCtxFactory lib.DBContextFactory) *RDBPoolInstanceStorager { + return &RDBPoolInstanceStorager{ + dbCtxFactory: dbCtxFactory, + } +} + +var _ icluster_conf.PoolInstanceStorager = &RDBPoolInstanceStorager{} + +func (rpps *RDBPoolInstanceStorager) UpdateInstances(ctx context.Context, pool *icluster_conf.Pool, + pis *icluster_conf.PoolInstances) error { + + var detail *string + if pis.Instances != nil { + bs, err := json.Marshal(pis.Instances) + if err != nil { + return xerror.WrapParamErrorWithMsg("Instances Marshal, err: %s", err) + } + + detail = lib.PString(string(bs)) + } + + dbCtx, err := rpps.dbCtxFactory(ctx) + if err != nil { + return err + } + _, err = dao.TPoolsUpdate(dbCtx, &dao.TPoolsParam{ + InstanceDetail: detail, + }, &dao.TPoolsParam{ + Id: &pool.ID, + }) + + return err +} + +func (rpps *RDBPoolInstanceStorager) BatchFetchInstances(ctx context.Context, + poolList []*icluster_conf.Pool) (map[string]*icluster_conf.PoolInstances, error) { + + m := map[string]*icluster_conf.PoolInstances{} + for _, one := range poolList { + // because of RDBPoolStorager.FetchPools will get pool list + // it's trick + pi := one.GetDefaultInstances() + m[pi.Name] = pi + } + + return m, nil +} diff --git a/storage/register/regsiter.go b/storage/register/regsiter.go index bd5c4ac..e1df04e 100644 --- a/storage/register/regsiter.go +++ b/storage/register/regsiter.go @@ -15,8 +15,6 @@ package register import ( - "strings" - "github.com/bfenetworks/api-server/model/icluster_conf" "github.com/bfenetworks/api-server/stateful" register "github.com/bfenetworks/api-server/storage/register/nacos" @@ -49,15 +47,16 @@ func (registerServier *RegisterServier) Init() { } func (registerServier *RegisterServier) GetRegisteredInstance(pools []*icluster_conf.Pool) { - for _, pool := range pools { - if registerType, ok := registerServier.TypeMapper[int(pool.Type)]; ok { - registerObject, _ok := registerServier.RegisterExample[registerType] - if !_ok { - continue - } - name := pool.Name[strings.Index(pool.Name, ".")+1:] - instances, _ := registerObject.GetInstance(name) - pool.Instances = instances - } - } + panic("") + // for _, pool := range pools { + // if registerType, ok := registerServier.TypeMapper[int(pool.Type)]; ok { + // registerObject, _ok := registerServier.RegisterExample[registerType] + // if !_ok { + // continue + // } + // name := pool.Name[strings.Index(pool.Name, ".")+1:] + // instances, _ := registerObject.GetInstance(name) + // pool.Instances = instances + // } + // } }