Skip to content

Commit 34e3e2d

Browse files
authored
Support tikv label check (#800)
1 parent 15f8736 commit 34e3e2d

12 files changed

Lines changed: 422 additions & 31 deletions

File tree

components/cluster/command/deploy.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ func newDeploy() *cobra.Command {
9797
cmd.Flags().StringVarP(&opt.IdentityFile, "identity_file", "i", opt.IdentityFile, "The path of the SSH identity file. If specified, public key authentication will be used.")
9898
cmd.Flags().BoolVarP(&opt.UsePassword, "password", "p", false, "Use password of target hosts. If specified, password authentication will be used.")
9999
cmd.Flags().BoolVarP(&opt.IgnoreConfigCheck, "ignore-config-check", "", opt.IgnoreConfigCheck, "Ignore the config check result")
100+
cmd.Flags().BoolVarP(&opt.NoLabels, "no-labels", "", false, "Don't check TiKV labels")
100101

101102
return cmd
102103
}

components/cluster/command/scale_out.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ func newScaleOutCmd() *cobra.Command {
7373
cmd.Flags().BoolVarP(&opt.SkipCreateUser, "skip-create-user", "", false, "Skip creating the user specified in topology (experimental).")
7474
cmd.Flags().StringVarP(&opt.IdentityFile, "identity_file", "i", opt.IdentityFile, "The path of the SSH identity file. If specified, public key authentication will be used.")
7575
cmd.Flags().BoolVarP(&opt.UsePassword, "password", "p", false, "Use password of target hosts. If specified, password authentication will be used.")
76+
cmd.Flags().BoolVarP(&opt.NoLabels, "no-labels", "", false, "Don't check TiKV labels")
7677

7778
return cmd
7879
}

examples/topology.example.yaml

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ server_configs:
6161
schedule.leader-schedule-limit: 4
6262
schedule.region-schedule-limit: 2048
6363
schedule.replica-schedule-limit: 64
64+
replication.location-labels: ["zone", "host"]
65+
replication.strictly-match-label: true
6466
tiflash:
6567
# path_realtime_mode: false
6668
logger.level: "info"
@@ -112,11 +114,15 @@ tikv_servers:
112114
# log_dir: "/tidb-deploy/tikv-20160/log"
113115
# numa_node: "0,1"
114116
# # The following configs are used to overwrite the `server_configs.tikv` values.
115-
# config:
117+
config:
118+
server.labels: { zone: "zone1", host: "host1" }
116119
# server.grpc-concurrency: 4
117-
# server.labels: { zone: "zone1", dc: "dc1", host: "host1" }
118120
- host: 10.0.1.15
121+
config:
122+
server.labels: { zone: "zone1", host: "host2" }
119123
- host: 10.0.1.16
124+
config:
125+
server.labels: { zone: "zone2", host: "host3" }
120126

121127
tiflash_servers:
122128
- host: 10.0.1.14

pkg/cluster/api/pdapi.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/pingcap/tiup/pkg/logger/log"
3232
"github.com/pingcap/tiup/pkg/utils"
3333
pdserverapi "github.com/tikv/pd/server/api"
34+
pdconfig "github.com/tikv/pd/server/config"
3435
)
3536

3637
// PDClient is an HTTP client of the PD server
@@ -661,6 +662,21 @@ func (pc *PDClient) GetReplicateConfig() ([]byte, error) {
661662
})
662663
}
663664

665+
// GetLocationLabels gets the replication.location-labels config from pd server
666+
func (pc *PDClient) GetLocationLabels() ([]string, error) {
667+
config, err := pc.GetReplicateConfig()
668+
if err != nil {
669+
return nil, err
670+
}
671+
672+
rc := pdconfig.ReplicationConfig{}
673+
if err := json.Unmarshal(config, &rc); err != nil {
674+
return nil, errors.Annotatef(err, "unmarshal replication config: %s", string(config))
675+
}
676+
677+
return rc.LocationLabels, nil
678+
}
679+
664680
// UpdateScheduleConfig updates the PD schedule config
665681
func (pc *PDClient) UpdateScheduleConfig(body io.Reader) error {
666682
return pc.updateConfig(body, pdConfigSchedule)

pkg/cluster/manager.go

Lines changed: 58 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,13 @@ import (
2828
"path/filepath"
2929
"sort"
3030
"strings"
31+
"time"
3132

3233
"github.com/fatih/color"
3334
"github.com/joomcode/errorx"
3435
perrs "github.com/pingcap/errors"
3536
"github.com/pingcap/tiup/pkg/cliutil"
37+
"github.com/pingcap/tiup/pkg/cluster/api"
3638
"github.com/pingcap/tiup/pkg/cluster/clusterutil"
3739
"github.com/pingcap/tiup/pkg/cluster/executor"
3840
operator "github.com/pingcap/tiup/pkg/cluster/operation"
@@ -184,7 +186,7 @@ func (m *Manager) StopCluster(clusterName string, options operator.Options) erro
184186

185187
tlsCfg, err := topo.TLSConfig(m.specManager.Path(clusterName, spec.TLSCertKeyDir))
186188
if err != nil {
187-
return perrs.AddStack(err)
189+
return err
188190
}
189191

190192
t := task.NewBuilder().
@@ -221,7 +223,7 @@ func (m *Manager) RestartCluster(clusterName string, options operator.Options) e
221223

222224
tlsCfg, err := topo.TLSConfig(m.specManager.Path(clusterName, spec.TLSCertKeyDir))
223225
if err != nil {
224-
return perrs.AddStack(err)
226+
return err
225227
}
226228

227229
t := task.NewBuilder().
@@ -291,7 +293,7 @@ func (m *Manager) CleanCluster(clusterName string, gOpt operator.Options, cleanO
291293

292294
tlsCfg, err := topo.TLSConfig(m.specManager.Path(clusterName, spec.TLSCertKeyDir))
293295
if err != nil {
294-
return perrs.AddStack(err)
296+
return err
295297
}
296298

297299
if !skipConfirm {
@@ -354,7 +356,7 @@ func (m *Manager) DestroyCluster(clusterName string, gOpt operator.Options, dest
354356

355357
tlsCfg, err := topo.TLSConfig(m.specManager.Path(clusterName, spec.TLSCertKeyDir))
356358
if err != nil {
357-
return perrs.AddStack(err)
359+
return err
358360
}
359361

360362
if !skipConfirm {
@@ -531,6 +533,10 @@ func (m *Manager) Display(clusterName string, opt operator.Options) error {
531533
filterRoles := set.NewStringSet(opt.Roles...)
532534
filterNodes := set.NewStringSet(opt.Nodes...)
533535
pdList := topo.BaseTopo().MasterList
536+
tlsCfg, err := topo.TLSConfig(m.specManager.Path(clusterName, spec.TLSCertKeyDir))
537+
if err != nil {
538+
return err
539+
}
534540
for _, comp := range topo.ComponentsByStartOrder() {
535541
for _, ins := range comp.Instances() {
536542
// apply role filter
@@ -549,10 +555,6 @@ func (m *Manager) Display(clusterName string, opt operator.Options) error {
549555
dataDir = insDirs[1]
550556
}
551557

552-
tlsCfg, err := topo.TLSConfig(m.specManager.Path(clusterName, spec.TLSCertKeyDir))
553-
if err != nil {
554-
return perrs.AddStack(err)
555-
}
556558
status := ins.Status(tlsCfg, pdList...)
557559
// Query the service status
558560
if status == "-" {
@@ -597,6 +599,17 @@ func (m *Manager) Display(clusterName string, opt operator.Options) error {
597599
cliutil.PrintTable(clusterTable, true)
598600
fmt.Printf("Total nodes: %d\n", len(clusterTable)-1)
599601

602+
if topo, ok := topo.(*spec.Specification); ok {
603+
// Check if TiKV's label set correctly
604+
kvs := topo.TiKVServers
605+
pdClient := api.NewPDClient(pdList, 10*time.Second, tlsCfg)
606+
if lbs, err := pdClient.GetLocationLabels(); err != nil {
607+
color.Yellow("\nWARN: get location labels from pd failed: %v", err)
608+
} else if err := spec.CheckTiKVLocationLabels(lbs, kvs); err != nil {
609+
color.Yellow("\nWARN: there is something wrong with TiKV labels, which may cause data losing:\n%v", err)
610+
}
611+
}
612+
600613
return nil
601614
}
602615

@@ -751,7 +764,7 @@ func (m *Manager) Reload(clusterName string, opt operator.Options, skipRestart b
751764

752765
tlsCfg, err := topo.TLSConfig(m.specManager.Path(clusterName, spec.TLSCertKeyDir))
753766
if err != nil {
754-
return perrs.AddStack(err)
767+
return err
755768
}
756769
if !skipRestart {
757770
tb = tb.Func("UpgradeCluster", func(ctx *task.Context) error {
@@ -896,7 +909,7 @@ func (m *Manager) Upgrade(clusterName string, clusterVersion string, opt operato
896909

897910
tlsCfg, err := topo.TLSConfig(m.specManager.Path(clusterName, spec.TLSCertKeyDir))
898911
if err != nil {
899-
return perrs.AddStack(err)
912+
return err
900913
}
901914
t := task.NewBuilder().
902915
SSHKeySet(
@@ -966,7 +979,7 @@ func (m *Manager) Patch(clusterName string, packagePath string, opt operator.Opt
966979

967980
tlsCfg, err := topo.TLSConfig(m.specManager.Path(clusterName, spec.TLSCertKeyDir))
968981
if err != nil {
969-
return perrs.AddStack(err)
982+
return err
970983
}
971984
t := task.NewBuilder().
972985
SSHKeySet(
@@ -1002,6 +1015,7 @@ type ScaleOutOptions struct {
10021015
SkipCreateUser bool // don't create user
10031016
IdentityFile string // path to the private key file
10041017
UsePassword bool // use password instead of identity file for ssh connection
1018+
NoLabels bool // don't check labels for TiKV instance
10051019
}
10061020

10071021
// DeployOptions contains the options for scale out.
@@ -1012,6 +1026,7 @@ type DeployOptions struct {
10121026
IdentityFile string // path to the private key file
10131027
UsePassword bool // use password instead of identity file for ssh connection
10141028
IgnoreConfigCheck bool // ignore config check result
1029+
NoLabels bool // don't check labels for TiKV instance
10151030
}
10161031

10171032
// DeployerInstance is a instance can deploy to a target deploy directory.
@@ -1059,6 +1074,18 @@ func (m *Manager) Deploy(
10591074
base.GlobalOptions.SSHType = sshType
10601075
}
10611076

1077+
if topo, ok := topo.(*spec.Specification); ok && !opt.NoLabels {
1078+
// Check if TiKV's label set correctly
1079+
lbs, err := topo.LocationLabels()
1080+
if err != nil {
1081+
return err
1082+
}
1083+
kvs := topo.TiKVServers
1084+
if err := spec.CheckTiKVLocationLabels(lbs, kvs); err != nil {
1085+
return perrs.Errorf("check TiKV label failed, please fix that before continue:\n%s", err)
1086+
}
1087+
}
1088+
10621089
clusterList, err := m.specManager.GetAllClusters()
10631090
if err != nil {
10641091
return err
@@ -1406,7 +1433,7 @@ func (m *Manager) ScaleIn(
14061433

14071434
tlsCfg, err := topo.TLSConfig(m.specManager.Path(clusterName, spec.TLSCertKeyDir))
14081435
if err != nil {
1409-
return perrs.AddStack(err)
1436+
return err
14101437
}
14111438

14121439
b := task.NewBuilder().
@@ -1480,6 +1507,24 @@ func (m *Manager) ScaleOut(
14801507
return err
14811508
}
14821509

1510+
if topo, ok := topo.(*spec.Specification); ok && !opt.NoLabels {
1511+
// Check if TiKV's label set correctly
1512+
pdList := topo.BaseTopo().MasterList
1513+
tlsCfg, err := topo.TLSConfig(m.specManager.Path(clusterName, spec.TLSCertKeyDir))
1514+
if err != nil {
1515+
return err
1516+
}
1517+
pdClient := api.NewPDClient(pdList, 10*time.Second, tlsCfg)
1518+
lbs, err := pdClient.GetLocationLabels()
1519+
if err != nil {
1520+
return err
1521+
}
1522+
kvs := mergedTopo.(*spec.Specification).TiKVServers
1523+
if err := spec.CheckTiKVLocationLabels(lbs, kvs); err != nil {
1524+
return perrs.Errorf("check TiKV label failed, please fix that before continue:\n%s", err)
1525+
}
1526+
}
1527+
14831528
clusterList, err := m.specManager.GetAllClusters()
14841529
if err != nil {
14851530
return err
@@ -1874,7 +1919,7 @@ func buildScaleOutTask(
18741919

18751920
tlsCfg, err := topo.TLSConfig(m.specManager.Path(clusterName, spec.TLSCertKeyDir))
18761921
if err != nil {
1877-
return nil, perrs.AddStack(err)
1922+
return nil, err
18781923
}
18791924

18801925
// Initialize the environments

pkg/cluster/spec/server_config.go

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -99,32 +99,65 @@ func patch(origin map[string]interface{}, key string, val interface{}) {
9999
}
100100
}
101101

102-
func flattenMap(ms map[string]interface{}) (map[string]interface{}, error) {
102+
func flattenMap(ms map[string]interface{}) map[string]interface{} {
103103
result := map[string]interface{}{}
104104
for k, v := range ms {
105105
key, val := flattenKey(k, v)
106106
patch(result, key, val)
107107
}
108-
return result, nil
108+
return result
109109
}
110110

111111
func merge(orig map[string]interface{}, overwrites ...map[string]interface{}) (map[string]interface{}, error) {
112-
lhs, err := flattenMap(orig)
113-
if err != nil {
114-
return nil, err
115-
}
112+
lhs := flattenMap(orig)
116113
for _, overwrite := range overwrites {
117-
rhs, err := flattenMap(overwrite)
118-
if err != nil {
119-
return nil, err
120-
}
114+
rhs := flattenMap(overwrite)
121115
for k, v := range rhs {
122116
patch(lhs, k, v)
123117
}
124118
}
125119
return lhs, nil
126120
}
127121

122+
// GetValueFromPath try to find the value by path recursively
123+
func GetValueFromPath(m map[string]interface{}, p string) interface{} {
124+
ss := strings.Split(p, ".")
125+
126+
searchMap := make(map[interface{}]interface{})
127+
for k, v := range m {
128+
searchMap[k] = v
129+
}
130+
131+
return searchValue(searchMap, ss)
132+
}
133+
134+
func searchValue(m map[interface{}]interface{}, ss []string) interface{} {
135+
l := len(ss)
136+
switch l {
137+
case 0:
138+
return m
139+
case 1:
140+
return m[ss[0]]
141+
}
142+
143+
if m[strings.Join(ss, ".")] != nil {
144+
return m[strings.Join(ss, ".")]
145+
}
146+
147+
for i := l - 1; i > 0; i-- {
148+
key := strings.Join(ss[:i], ".")
149+
if m[key] == nil {
150+
continue
151+
}
152+
if pm, ok := m[key].(map[interface{}]interface{}); ok {
153+
return searchValue(pm, ss[i:])
154+
}
155+
return nil
156+
}
157+
158+
return nil
159+
}
160+
128161
// Merge2Toml merge the config of global.
129162
func Merge2Toml(comp string, global, overwrite map[string]interface{}) ([]byte, error) {
130163
return merge2Toml(comp, global, overwrite)

pkg/cluster/spec/server_config_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,30 @@ server_configs:
3535
decimal = bytes.Contains(get, []byte("0.0"))
3636
c.Assert(decimal, check.IsTrue)
3737
}
38+
39+
func (s *configSuite) TestGetValueFromPath(c *check.C) {
40+
yamlData := []byte(`
41+
server_configs:
42+
tidb:
43+
a.b.c.d: 1
44+
a:
45+
b:
46+
c:
47+
d: 2
48+
a.b:
49+
c.e: 3
50+
a.b.c:
51+
f: 4
52+
h.i.j.k: [1, 2, 3]
53+
`)
54+
55+
topo := new(Specification)
56+
57+
err := yaml.Unmarshal(yamlData, topo)
58+
c.Assert(err, check.IsNil)
59+
60+
c.Assert(GetValueFromPath(topo.ServerConfigs.TiDB, "a.b.c.d"), check.Equals, 1)
61+
c.Assert(GetValueFromPath(topo.ServerConfigs.TiDB, "a.b.c.e"), check.Equals, nil)
62+
c.Assert(GetValueFromPath(topo.ServerConfigs.TiDB, "a.b.c.f"), check.Equals, 4)
63+
c.Assert(GetValueFromPath(topo.ServerConfigs.TiDB, "h.i.j.k"), check.DeepEquals, []interface{}{1, 2, 3})
64+
}

0 commit comments

Comments
 (0)