Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 72 additions & 81 deletions pkg/cloudprovider/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,30 +96,6 @@ func New(
}
}

// Create a node given the constraints.
func (c *CloudProvider) handleVMInstanceCreation(ctx context.Context, vmPromise *instance.VirtualMachinePromise, nodeClaim *karpv1.NodeClaim) error {
if c.isStandaloneNodeClaim(nodeClaim) {
// processStandaloneNodeClaimDeletion:
// Standalone NodeClaims aren’t re-queued for reconciliation in the provision_trigger controller,
// so we delete them synchronously. After marking Launched=true,
// their status can’t be reverted to false once the delete completes due to how core caches nodeclaims in
// the lanch controller. This ensures we retry continuously until we hit the registration TTL
err := vmPromise.Wait()
if err != nil {
return c.handleNodeClaimCreationError(ctx, err, vmPromise, nodeClaim, false)
}
} else {
// For NodePool-managed nodeclaims, launch a single goroutine to poll the returned promise.
// Note that we could store the LRO details on the NodeClaim, but we don't bother today because Karpenter
// crashes should be rare, and even in the case of a crash, as long as the node comes up successfully there's
// no issue. If the node doesn't come up successfully in that case, the node and the linked claim will
// be garbage collected after the TTL, but the cause of the nodes issue will be lost, as the LRO URL was
// only held in memory.
go c.waitOnPromise(ctx, vmPromise, nodeClaim)
}
return nil
}

func (c *CloudProvider) validateNodeClass(nodeClass *v1beta1.AKSNodeClass) error {
nodeClassReady := nodeClass.StatusConditions().Get(status.ConditionReady)
if nodeClassReady.IsFalse() {
Expand Down Expand Up @@ -169,12 +145,17 @@ func (c *CloudProvider) Create(ctx context.Context, nodeClaim *karpv1.NodeClaim)
if len(instanceTypes) == 0 {
return nil, cloudprovider.NewInsufficientCapacityError(fmt.Errorf("all requested instance types were unavailable during launch"))
}

return c.createVMInstance(ctx, nodeClass, nodeClaim, instanceTypes)
}

func (c *CloudProvider) createVMInstance(ctx context.Context, nodeClass *v1beta1.AKSNodeClass, nodeClaim *karpv1.NodeClaim, instanceTypes []*cloudprovider.InstanceType) (*karpv1.NodeClaim, error) {
vmPromise, err := c.vmInstanceProvider.BeginCreate(ctx, nodeClass, nodeClaim, instanceTypes)
if err != nil {
return nil, cloudprovider.NewCreateError(fmt.Errorf("creating instance failed, %w", err), CreateInstanceFailedReason, truncateMessage(err.Error()))
}

if err = c.handleVMInstanceCreation(ctx, vmPromise, nodeClaim); err != nil {
if err := c.handleInstancePromise(ctx, vmPromise, nodeClaim); err != nil {
return nil, err
}

Expand All @@ -183,37 +164,82 @@ func (c *CloudProvider) Create(ctx context.Context, nodeClaim *karpv1.NodeClaim)
instanceType, _ := lo.Find(instanceTypes, func(i *cloudprovider.InstanceType) bool {
return i.Name == string(lo.FromPtr(vm.Properties.HardwareProfile.VMSize))
})

nc, err := c.vmInstanceToNodeClaim(ctx, vm, instanceType)
newNodeClaim, err := c.vmInstanceToNodeClaim(ctx, vm, instanceType)
if err != nil {
return nil, err
}
if err := setAdditionalAnnotationsForNewNodeClaim(ctx, nc, nodeClass); err != nil {
if err := setAdditionalAnnotationsForNewNodeClaim(ctx, newNodeClaim, nodeClass); err != nil {
return nil, err
}
return nc, nil
return newNodeClaim, nil
}

func (c *CloudProvider) waitOnPromise(ctx context.Context, vmPromise *instance.VirtualMachinePromise, nodeClaim *karpv1.NodeClaim) {
defer func() {
if r := recover(); r != nil {
err := fmt.Errorf("%v", r)
log.FromContext(ctx).Error(err, "panic during waitOnPromise")
// handleInstancePromise handles the instance promise, primarily deciding on sync/async provisioning.
func (c *CloudProvider) handleInstancePromise(ctx context.Context, instancePromise instance.Promise, nodeClaim *karpv1.NodeClaim) error {
if isNodeClaimStandalone(nodeClaim) {
// Standalone NodeClaims aren't re-queued for reconciliation in the provision_trigger controller,
// so we delete them synchronously. After marking Launched=true,
// their status can't be reverted to false once the delete completes due to how core caches nodeclaims in
// the launch controller. This ensures we retry continuously until we hit the registration TTL
err := instancePromise.Wait()
if err != nil {
c.handleInstancePromiseWaitError(ctx, instancePromise, nodeClaim, err)
return cloudprovider.NewCreateError(fmt.Errorf("creating standalone instance failed, %w", err), CreateInstanceFailedReason, truncateMessage(err.Error()))
}
}()
}
// For NodePool-managed nodeclaims, launch a single goroutine to poll the returned promise.
// Note that we could store the LRO details on the NodeClaim, but we don't bother today because Karpenter
// crashes should be rare, and even in the case of a crash, as long as the node comes up successfully there's
// no issue. If the node doesn't come up successfully in that case, the node and the linked claim will
// be garbage collected after the TTL, but the cause of the nodes issue will be lost, as the LRO URL was
// only held in memory.
go func() {
defer func() {
if r := recover(); r != nil {
err := fmt.Errorf("%v", r)
log.FromContext(ctx).Error(err, "panic during waiting on instance promise")
}
}()

err := vmPromise.Wait()
err := instancePromise.Wait()

// Wait until the claim is Launched, to avoid racing with creation.
// This isn't strictly required, but without this, failure test scenarios are harder
// to write because the nodeClaim gets deleted by error handling below before
// the EnsureApplied call finishes, so EnsureApplied creates it again (which is wrong/isn't how
// it would actually happen in production).
c.waitUntilLaunched(ctx, nodeClaim)
// Wait until the claim is Launched, to avoid racing with creation.
// This isn't strictly required, but without this, failure test scenarios are harder
// to write because the nodeClaim gets deleted by error handling below before
// the EnsureApplied call finishes, so EnsureApplied creates it again (which is wrong/isn't how
// it would actually happen in production).
c.waitUntilLaunched(ctx, nodeClaim)

if err != nil {
_ = c.handleNodeClaimCreationError(ctx, err, vmPromise, nodeClaim, true)
return
if err != nil {
c.handleInstancePromiseWaitError(ctx, instancePromise, nodeClaim, err)

// For async provisioning, also delete the NodeClaim
if deleteErr := c.kubeClient.Delete(ctx, nodeClaim); deleteErr != nil {
deleteErr = client.IgnoreNotFound(deleteErr)
if deleteErr != nil {
log.FromContext(ctx).Error(deleteErr, "failed to delete nodeclaim, will wait for liveness TTL", "NodeClaim", nodeClaim.Name)
}
}
metrics.NodeClaimsDisruptedTotal.Inc(map[string]string{
metrics.ReasonLabel: "async_provisioning",
metrics.NodePoolLabel: nodeClaim.Labels[karpv1.NodePoolLabelKey],
metrics.CapacityTypeLabel: nodeClaim.Labels[karpv1.CapacityTypeLabelKey],
})
}
}()
return nil
}

func (c *CloudProvider) handleInstancePromiseWaitError(ctx context.Context, instancePromise instance.Promise, nodeClaim *karpv1.NodeClaim, waitErr error) {
c.recorder.Publish(cloudproviderevents.NodeClaimFailedToRegister(nodeClaim, waitErr))
log.FromContext(ctx).Error(waitErr, "failed launching nodeclaim")

cleanUpError := instancePromise.Cleanup(ctx)
if cleanUpError != nil {
// Fallback to garbage collection to clean up the instance, if it survived.
if cloudprovider.IgnoreNodeClaimNotFoundError(cleanUpError) != nil {
log.FromContext(ctx).Error(cleanUpError, "failed to delete instance", "instanceName", instancePromise.GetInstanceName())
}
}
}

Expand All @@ -240,41 +266,6 @@ func (c *CloudProvider) waitUntilLaunched(ctx context.Context, nodeClaim *karpv1
}
}

// handleNodeClaimCreationError handles common error processing for both standalone and async node claim creation failures
func (c *CloudProvider) handleNodeClaimCreationError(ctx context.Context, err error, vmPromise *instance.VirtualMachinePromise, nodeClaim *karpv1.NodeClaim, removeNodeClaim bool) error {
c.recorder.Publish(cloudproviderevents.NodeClaimFailedToRegister(nodeClaim, err))
log.FromContext(ctx).Error(err, "failed launching nodeclaim")

// Clean up the VM
// TODO: This won't clean up leaked NICs if the VM doesn't exist... intentional?
vmName := lo.FromPtr(vmPromise.VM.Name)
deleteErr := c.vmInstanceProvider.Delete(ctx, vmName)
if cloudprovider.IgnoreNodeClaimNotFoundError(deleteErr) != nil {
log.FromContext(ctx).Error(deleteErr, "failed to delete VM", "vmName", vmName)
}

// For async provisioning, also delete the NodeClaim
if removeNodeClaim {
if deleteErr := c.kubeClient.Delete(ctx, nodeClaim); deleteErr != nil {
deleteErr = client.IgnoreNotFound(deleteErr)
if deleteErr != nil {
log.FromContext(ctx).Error(deleteErr, "failed to delete nodeclaim, will wait for liveness TTL", "NodeClaim", nodeClaim.Name)
}
}
metrics.NodeClaimsDisruptedTotal.Inc(map[string]string{
metrics.ReasonLabel: "async_provisioning",
metrics.NodePoolLabel: nodeClaim.Labels[karpv1.NodePoolLabelKey],
metrics.CapacityTypeLabel: nodeClaim.Labels[karpv1.CapacityTypeLabelKey],
})
}

// For standalone node claims, return a CreateError
if !removeNodeClaim {
return cloudprovider.NewCreateError(fmt.Errorf("creating instance failed, %w", err), CreateInstanceFailedReason, truncateMessage(err.Error()))
}
return nil
}

func (c *CloudProvider) List(ctx context.Context) ([]*karpv1.NodeClaim, error) {
vmInstances, err := c.vmInstanceProvider.List(ctx)
if err != nil {
Expand Down Expand Up @@ -508,7 +499,7 @@ func GetNodeClaimNameFromVMName(vmName string) string {

const truncateAt = 1200

func (c *CloudProvider) isStandaloneNodeClaim(nodeClaim *karpv1.NodeClaim) bool {
func isNodeClaimStandalone(nodeClaim *karpv1.NodeClaim) bool {
// NodeClaims without the nodepool label are considered standalone
_, hasNodePoolLabel := nodeClaim.Labels[karpv1.NodePoolLabelKey]
return !hasNodePoolLabel
Expand Down
31 changes: 31 additions & 0 deletions pkg/providers/instance/instancepromise.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
Portions Copyright (c) Microsoft Corporation.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package instance

import (
"context"
)

// Intended for lifecycle handling on the higher abstractions.
type Promise interface {
// Cleanup removes the instance from the cloud provider.
Cleanup(ctx context.Context) error
// Wait blocks until the instance is ready.
Wait() error
// GetInstanceName returns the name of the instance. Recommended to be used for logging only due to generic nature.
GetInstanceName() string
}
23 changes: 20 additions & 3 deletions pkg/providers/instance/vminstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,24 @@ func GetManagedExtensionNames(provisionMode string) []string {
type Resource = map[string]interface{}

type VirtualMachinePromise struct {
VM *armcompute.VirtualMachine
Wait func() error
VM *armcompute.VirtualMachine
WaitFunc func() error

providerRef VMProvider
}

func (p *VirtualMachinePromise) Cleanup(ctx context.Context) error {
// This won't clean up leaked NICs if the VM doesn't exist... intentional?
// From Delete(): "Leftover network interfaces (if any) will be cleaned by by GC controller"
// Still, we could try to DeleteNic()?
return p.providerRef.Delete(ctx, lo.FromPtr(p.VM.Name))
}

func (p *VirtualMachinePromise) Wait() error {
return p.WaitFunc()
}
func (p *VirtualMachinePromise) GetInstanceName() string {
return lo.FromPtr(p.VM.Name)
}

type VMProvider interface {
Expand Down Expand Up @@ -750,7 +766,8 @@ func (p *DefaultVMProvider) beginLaunchInstance(
result.VM.Properties.TimeCreated = lo.ToPtr(time.Now())

return &VirtualMachinePromise{
Wait: func() error {
providerRef: p,
WaitFunc: func() error {
if result.Poller == nil {
// Poller is nil means the VM existed already and we're done.
// TODO: if the VM doesn't have extensions this will still happen and we will have to
Expand Down
Loading