forked from kubernetes/autoscaler
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathazure_scale_set_instance_cache.go
More file actions
268 lines (235 loc) · 11.1 KB
/
azure_scale_set_instance_cache.go
File metadata and controls
268 lines (235 loc) · 11.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package azure
import (
"fmt"
"strconv"
"sync"
"time"
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2022-08-01/compute"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
)
/*
- "instanceCache" is included in the scaleSet data structures and holds
status information of the instances / vms. This data is used by the CAS
to make scaleUp / scaleDown decisions based on what is the current state
the cluster without making an api call.
- The time for this cache is represented by "instancesRefreshPeriod" which
by default is defaultVmssInstancesRefreshPeriod ~ 5 mins.
- "lastInstanceRefresh" represents the time when the cache was validated
the last time.
- Following methods are defined related to the instanceCache:
- invalidateInstanceCache()
- validateInstanceCache()
- validateInstanceCacheWithoutLock()
- updateInstanceCache()
- getInstanceByProviderID()
- getInstancesByState()
- getInstanceCacheSize()
- setInstanceStatusByProviderID()
- setInstanceStatusByProviderID()
*/
// InstanceCache tracks the VMs in the ScaleSet, in the form of corresponding cloudprovider.Instances.
// This struct also contains related locks and cache interval variables.
type InstanceCache struct {
// instanceCache tracks the VMs in the ScaleSet, in the form of corresponding cloudprovider.Instances.
// instanceCache directly backs the efficient response to NodeGroup.Nodes(), implemented by ScaleSet.Nodes().
// It is periodially updated from VMSS using virtualMachineScaleSetVMsClient.List().
instanceCache []cloudprovider.Instance
// instancesRefreshPeriod is how often instance cache is refreshed from VMSS.
// (Set from VmssVmsCacheTTL or defaultVmssInstancesRefreshPeriod = 5min)
instancesRefreshPeriod time.Duration
// lastInstanceRefresh is the time instanceCache was last refreshed from VMSS.
// Together with instancesRefreshPeriod, it is used to determine if it is time to refresh instanceCache.
lastInstanceRefresh time.Time
// instancesRefreshJitter (in seconds) is used to ensure refreshes (which involve expensive List call)
// don't happen at exactly the same time on all ScaleSets
instancesRefreshJitter int
// instanceMutex is used for protecting instance cache from concurrent access
instanceMutex sync.Mutex
}
// invalidateInstanceCache invalidates the instanceCache by modifying the lastInstanceRefresh.
func (scaleSet *ScaleSet) invalidateInstanceCache() {
scaleSet.instanceMutex.Lock()
defer scaleSet.instanceMutex.Unlock()
// Set the instanceCache as outdated.
klog.V(3).Infof("invalidating instanceCache for %s", scaleSet.Name)
scaleSet.lastInstanceRefresh = time.Now().Add(-1 * scaleSet.instancesRefreshPeriod)
}
// validateInstanceCache updates the instanceCache if it has expired. It acquires lock.
func (scaleSet *ScaleSet) validateInstanceCache() error {
scaleSet.instanceMutex.Lock()
defer scaleSet.instanceMutex.Unlock()
return scaleSet.validateInstanceCacheWithoutLock()
}
// validateInstanceCacheWithoutLock is used a helper function for validateInstanceCache, get and set methods.
func (scaleSet *ScaleSet) validateInstanceCacheWithoutLock() error {
if scaleSet.lastInstanceRefresh.Add(scaleSet.instancesRefreshPeriod).After(time.Now()) {
klog.V(3).Infof("validateInstanceCacheWithoutLock: no need to reset instance Cache for scaleSet %s",
scaleSet.Name)
return nil
}
return scaleSet.updateInstanceCache()
}
// updateInstanceCache forcefully updates the cache without checking the timer - lastInstanceRefresh.
// Caller is responsible for acquiring lock on the instanceCache.
func (scaleSet *ScaleSet) updateInstanceCache() error {
orchestrationMode, err := scaleSet.getOrchestrationMode()
if err != nil {
klog.Errorf("failed to get information for VMSS: %s, error: %v", scaleSet.Name, err)
return err
}
if orchestrationMode == compute.Flexible {
if scaleSet.manager.config.EnableVmssFlexNodes {
return scaleSet.buildScaleSetCacheForFlex()
}
return fmt.Errorf("vmss - %q with Flexible orchestration detected but 'enableVmssFlexNodes' feature flag is turned off", scaleSet.Name)
} else if orchestrationMode == compute.Uniform {
return scaleSet.buildScaleSetCacheForUniform()
}
return fmt.Errorf("failed to determine orchestration mode for vmss %q", scaleSet.Name)
}
// getInstanceByProviderID returns instance from instanceCache if given providerID exists.
func (scaleSet *ScaleSet) getInstanceByProviderID(providerID string) (cloudprovider.Instance, bool, error) {
scaleSet.instanceMutex.Lock()
defer scaleSet.instanceMutex.Unlock()
err := scaleSet.validateInstanceCacheWithoutLock()
if err != nil {
klog.Errorf("getInstanceByProviderID: error validating instanceCache for providerID %s for scaleSet %s, err: %v",
providerID, scaleSet.Name, err)
return cloudprovider.Instance{}, false, err
}
for _, instance := range scaleSet.instanceCache {
if instance.Id == providerID {
return instance, true, nil
}
}
return cloudprovider.Instance{}, false, nil
}
// getInstancesByState returns list of instances with given state.
func (scaleSet *ScaleSet) getInstancesByState(state cloudprovider.InstanceState) ([]cloudprovider.Instance, error) {
scaleSet.instanceMutex.Lock()
defer scaleSet.instanceMutex.Unlock()
err := scaleSet.validateInstanceCacheWithoutLock()
if err != nil {
klog.Errorf("getInstancesByState: error validating instanceCache for state %d for scaleSet %s, "+
"err: %v", state, scaleSet.Name, err)
return []cloudprovider.Instance{}, err
}
instances := []cloudprovider.Instance{}
for _, instance := range scaleSet.instanceCache {
if instance.Status != nil && instance.Status.State == state {
instances = append(instances, instance)
}
}
return instances, nil
}
// getInstanceCacheSize returns the size of the instanceCache.
func (scaleSet *ScaleSet) getInstanceCacheSize() (int64, error) {
scaleSet.instanceMutex.Lock()
defer scaleSet.instanceMutex.Unlock()
err := scaleSet.validateInstanceCacheWithoutLock()
if err != nil {
klog.Errorf("getInstanceCacheSize: error validating instanceCache for scaleSet: %s, "+
"err: %v", scaleSet.Name, err)
return -1, err
}
return int64(len(scaleSet.instanceCache)), nil
}
// setInstanceStatusByProviderID sets the status for an instance with given providerID.
// It reset the cache if stale and sets the status by acquiring a lock.
func (scaleSet *ScaleSet) setInstanceStatusByProviderID(providerID string, status cloudprovider.InstanceStatus) {
scaleSet.instanceMutex.Lock()
defer scaleSet.instanceMutex.Unlock()
err := scaleSet.validateInstanceCacheWithoutLock()
if err != nil {
klog.Errorf("setInstanceStatusByProviderID: error validating instanceCache for providerID %s for "+
"scaleSet: %s, err: %v", providerID, scaleSet.Name, err)
// return no error because CAS runs with the expectation that future runs will refresh instance Cache
}
for k, instance := range scaleSet.instanceCache {
if instance.Id == providerID {
klog.V(3).Infof("setInstanceStatusByProviderID: setting instance state for %s for scaleSet "+
"%s to %d", instance.Id, scaleSet.Name, status.State)
scaleSet.instanceCache[k].Status = &status
break
}
}
}
// instanceStatusFromVM converts the VM provisioning state to cloudprovider.InstanceStatus.
// Suggestion: reunify this with instanceStatusFromProvisioningStateAndPowerState() in azure_scale_set.go
func (scaleSet *ScaleSet) instanceStatusFromVM(vm *compute.VirtualMachineScaleSetVM) *cloudprovider.InstanceStatus {
// Prefer the proactive cache view of the instance state if we aren't in a terminal state
// This is because the power state may be taking longer to update and we don't want
// an unfortunate VM update (TTL 5 min) to reset that state to running.
if vm.ProvisioningState == nil || *vm.ProvisioningState == string(compute.GalleryProvisioningStateUpdating) {
resourceID, _ := convertResourceGroupNameToLower(*vm.ID)
providerID := azurePrefix + resourceID
for _, instance := range scaleSet.instanceCache {
if instance.Id == providerID {
return instance.Status
}
}
return nil
}
powerState := vmPowerStateRunning
if vm.InstanceView != nil && vm.InstanceView.Statuses != nil {
powerState = vmPowerStateFromStatuses(*vm.InstanceView.Statuses)
}
status := &cloudprovider.InstanceStatus{}
switch *vm.ProvisioningState {
case string(compute.GalleryProvisioningStateDeleting):
status.State = cloudprovider.InstanceDeleting
case string(compute.GalleryProvisioningStateCreating):
status.State = cloudprovider.InstanceCreating
case string(compute.GalleryProvisioningStateFailed):
status.State = cloudprovider.InstanceRunning
klog.V(3).Infof("VM %s reports failed provisioning state with power state: %s, eligible for fast delete: %s", ptr.Deref(vm.ID, ""), powerState, strconv.FormatBool(scaleSet.enableFastDeleteOnFailedProvisioning))
if scaleSet.enableFastDeleteOnFailedProvisioning {
// Provisioning can fail both during instance creation or after the instance is running.
// Per https://learn.microsoft.com/en-us/azure/virtual-machines/states-billing#provisioning-states,
// ProvisioningState represents the most recent provisioning state, therefore only report
// InstanceCreating errors when the power state indicates the instance has not yet started running
if !isRunningVmPowerState(powerState) {
// This fast deletion relies on the fact that InstanceCreating + ErrorInfo will subsequently trigger a deletion.
// Could be revisited to rely on something more stable/explicit.
status.State = cloudprovider.InstanceCreating
status.ErrorInfo = &cloudprovider.InstanceErrorInfo{
ErrorClass: cloudprovider.OutOfResourcesErrorClass,
ErrorCode: "provisioning-state-failed",
ErrorMessage: "Azure failed to provision a node for this node group",
}
} else {
status.State = cloudprovider.InstanceRunning
}
}
default:
status.State = cloudprovider.InstanceRunning
}
// Add vmssCSE Provisioning Failed Message in error info body for vmssCSE Extensions if enableDetailedCSEMessage is true
if scaleSet.enableDetailedCSEMessage && vm.InstanceView != nil {
if err, failed := scaleSet.cseErrors(vm.InstanceView.Extensions); failed {
klog.V(3).Infof("VM %s reports CSE failure: %v, with provisioning state %s, power state %s", ptr.Deref(vm.ID, ""), err, ptr.Deref(vm.ProvisioningState, ""), powerState)
status.State = cloudprovider.InstanceCreating
errorInfo := &cloudprovider.InstanceErrorInfo{
ErrorClass: cloudprovider.OtherErrorClass,
ErrorCode: vmssExtensionProvisioningFailed,
ErrorMessage: fmt.Sprintf("%s: %v", ptr.Deref(vm.Name, ""), err),
}
status.ErrorInfo = errorInfo
}
}
return status
}