Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
d3ce4fe
[CASCL-1386] Add `evict-legacy-nodes` subcommand to drain non-Datadog…
L3n41c May 18, 2026
4e1f17f
[CASCL-1386] Apply `make fmt` whitespace cleanup
L3n41c May 18, 2026
7ee9656
[CASCL-1386] Test preflight, prompt, and uniqueNodes coverage
L3n41c May 18, 2026
09902c5
[CASCL-1386] Address Codex review
L3n41c May 18, 2026
ba570c0
[CASCL-1386] Move ExtractEC2InstanceID to common/aws so every caller …
L3n41c May 19, 2026
0feea18
[CASCL-1386] Inline table-driven test cases per sibling convention
L3n41c May 19, 2026
97d3ad4
L3n41c May 19, 2026
686ab7d
[CASCL-1386] Drop dead providerID check in evictASG
L3n41c May 19, 2026
9fd1af2
[CASCL-1386] Consolidate evictASG tests into a table-driven `TestEvic…
L3n41c May 19, 2026
891800a
[CASCL-1386] Consolidate evict-package tests into table-driven functions
L3n41c May 19, 2026
8c97dee
[CASCL-1386] Use slices.ContainsFunc in isDaemonSetPod
L3n41c May 19, 2026
5d2eb24
[CASCL-1386] Collapse shouldSkipEviction / podOccupiesNode to a singl…
L3n41c May 19, 2026
de0347d
[CASCL-1386] Use lo.CountBy in waitForNodeEmpty
L3n41c May 19, 2026
db487a4
[CASCL-1386] Switch eviction dispatch to tagless switch
L3n41c May 19, 2026
cd7914a
[CASCL-1386] Set PollInterval at construction, drop dead defaults
L3n41c May 19, 2026
81b1ce3
[CASCL-1386] Drain targets sequentially
L3n41c May 19, 2026
96d5f50
[CASCL-1386] Reformat eviction tests with gofmt
L3n41c May 20, 2026
2f00ddd
[CASCL-1386] Address Codex review on EKS MNG scaling + dry-run
L3n41c May 20, 2026
099620a
[CASCL-1386] Use t.Context() in eviction tests
L3n41c May 27, 2026
db5d0f8
[CASCL-1386] Refactor temp-PDB discovery: struct keys, generics, lo h…
L3n41c Jun 11, 2026
d77c9cc
[CASCL-1386] Cover StatefulSet/bare-RS/DaemonSet controller resolution
L3n41c Jun 11, 2026
7093d74
[CASCL-1386] Reclaim leaked temp PDBs on the no-target path
L3n41c Jun 11, 2026
dd64016
[CASCL-1386] Cover reclaimLeakedTempPDBs cleanup-error branch
L3n41c Jun 11, 2026
e75e7bf
[CASCL-1386] Simplify plan/prompt with stdlib maps/slices and lo helpers
L3n41c Jun 11, 2026
56b081d
[CASCL-1386] Drop redundant defensive copies and nil guard in evict plan
L3n41c Jun 12, 2026
9f7eadd
[CASCL-1386] Terminate drained instances inline during eviction
L3n41c Jun 12, 2026
4c6021f
[CASCL-1386] Cordon all legacy nodes up front before draining
L3n41c Jun 16, 2026
12d37b1
[CASCL-1386] Fold dry-run handling into cordonNode's retry closure
L3n41c Jun 16, 2026
89442cb
[CASCL-1386] Harmonize impact-list bullet styles across evict and uni…
L3n41c Jun 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/kubectl-datadog/autoscaling/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/spf13/cobra"
"k8s.io/cli-runtime/pkg/genericclioptions"

"github.com/DataDog/datadog-operator/cmd/kubectl-datadog/autoscaling/cluster/evict"
"github.com/DataDog/datadog-operator/cmd/kubectl-datadog/autoscaling/cluster/install"
"github.com/DataDog/datadog-operator/cmd/kubectl-datadog/autoscaling/cluster/uninstall"
"github.com/DataDog/datadog-operator/cmd/kubectl-datadog/autoscaling/cluster/update"
Expand Down Expand Up @@ -35,6 +36,7 @@ func New(streams genericclioptions.IOStreams) *cobra.Command {
cmd.AddCommand(install.New(streams))
cmd.AddCommand(uninstall.New(streams))
cmd.AddCommand(update.New(streams))
cmd.AddCommand(evict.New(streams))

o := newOptions(streams)
o.configFlags.AddFlags(cmd.Flags())
Expand Down
36 changes: 36 additions & 0 deletions cmd/kubectl-datadog/autoscaling/cluster/common/aws/node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package aws

import (
"regexp"

corev1 "k8s.io/api/core/v1"
)

// awsProviderIDRegexp matches the AWS provider ID for EC2-backed nodes.
// Format: aws:///<az>/i-<hex> (e.g. aws:///us-east-1a/i-0abc123def456789).
// Fargate nodes use a different shape (aws:///<az>/fargate-ip-...) and must
// therefore be classified by label before reaching this regex.
var awsProviderIDRegexp = regexp.MustCompile(`^aws:///[^/]+/(i-[0-9a-f]+)$`)

// LabelEKSNodegroup is the label EKS stamps on every node that belongs to a
// managed node group. The label value is the node group name. Exposed as a
// constant so every consumer (classifier, evict-legacy-nodes, future code)
// references the same string.
const LabelEKSNodegroup = "eks.amazonaws.com/nodegroup"

// ExtractEC2InstanceID returns the EC2 instance ID (i-...) from a Node's
// providerID, or false when the providerID is not an EC2 instance (Fargate
// uses `aws:///<az>/fargate-ip-...`, GCP/Azure use entirely different shapes,
// etc.). Lives here in `common/aws` so both `common/clusterinfo` (which
// imports `common/karpenter`) and `common/karpenter` (which classifies its
// own nodes) can use it without creating an import cycle.
func ExtractEC2InstanceID(node *corev1.Node) (string, bool) {
if node == nil {
return "", false
}
m := awsProviderIDRegexp.FindStringSubmatch(node.Spec.ProviderID)
if len(m) != 2 {
return "", false
}
return m[1], true
}
38 changes: 38 additions & 0 deletions cmd/kubectl-datadog/autoscaling/cluster/common/aws/node_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package aws

import (
"testing"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
)

func TestExtractEC2InstanceID(t *testing.T) {
for _, tc := range []struct {
name string
provider string
wantID string
wantOK bool
}{
{name: "ec2", provider: "aws:///eu-west-3a/i-0123456789abcdef0", wantID: "i-0123456789abcdef0", wantOK: true},
{name: "ec2 short id", provider: "aws:///us-east-1b/i-abc123", wantID: "i-abc123", wantOK: true},
{name: "fargate", provider: "aws:///eu-west-3a/fargate-ip-10-0-1-2", wantOK: false},
{name: "gcp", provider: "gce://project/zone/instance", wantOK: false},
{name: "empty", provider: "", wantOK: false},
{name: "missing prefix", provider: "i-0123456789abcdef0", wantOK: false},
{name: "missing AZ", provider: "aws:////i-0123456789abcdef0", wantOK: false},
} {
t.Run(tc.name, func(t *testing.T) {
node := &corev1.Node{Spec: corev1.NodeSpec{ProviderID: tc.provider}}
id, ok := ExtractEC2InstanceID(node)
assert.Equal(t, tc.wantOK, ok)
if tc.wantOK {
assert.Equal(t, tc.wantID, id)
}
})
}
t.Run("nil node", func(t *testing.T) {
_, ok := ExtractEC2InstanceID(nil)
assert.False(t, ok)
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"log"
"maps"
"regexp"
"strings"
"time"

Expand Down Expand Up @@ -36,11 +35,6 @@ import (
// ValidationError at the API.
const describeASGInstancesMaxIDs = 50

// awsProviderIDRegexp matches the AWS provider ID for EC2-backed nodes.
// Format: aws:///<az>/i-<hex>. Fargate nodes use a different shape and
// must therefore be classified by label before reaching this regex.
var awsProviderIDRegexp = regexp.MustCompile(`^aws:///[^/]+/(i-[0-9a-f]+)$`)

// nodePoolDatadogCreatedLabel is the label set by every Datadog autoscaling
// product (kubectl-datadog AND the cluster agent) on the NodePools they
// manage. Broader than the AND-pair `app.kubernetes.io/managed-by:
Expand Down Expand Up @@ -173,10 +167,9 @@ func classifyByLabels(ctx context.Context, k8sClient kubernetes.Interface, farga
return nil
}

matches := awsProviderIDRegexp.FindStringSubmatch(node.Spec.ProviderID)
if len(matches) == 2 {
if id, ok := commonaws.ExtractEC2InstanceID(node); ok {
candidates = append(candidates, asgCandidate{
instanceID: matches[1],
instanceID: id,
nodeName: node.Name,
})
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"log"
"maps"
"regexp"
"slices"
"strings"

Expand All @@ -17,11 +16,9 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/pager"
)

// awsProviderIDRegexp matches the AWS provider ID format for EC2 instances.
// Format: aws:///ZONE/INSTANCE_ID (e.g., aws:///us-east-1a/i-0abc123def456789)
var awsProviderIDRegexp = regexp.MustCompile(`^aws:///[^/]+/(i-[0-9a-f]+)$`)
commonaws "github.com/DataDog/datadog-operator/cmd/kubectl-datadog/autoscaling/cluster/common/aws"
)

// ec2DescribeBatchSize bounds the number of instance IDs we hand to a single
// ec2:DescribeInstances / ec2:DescribeImages call. The K8s pager streams
Expand Down Expand Up @@ -62,12 +59,12 @@ func GetNodesProperties(ctx context.Context, clientset *kubernetes.Clientset, ec
if _, isKarpenter := node.Labels["karpenter.k8s.aws/ec2nodeclass"]; isKarpenter {
return nil
}
matches := awsProviderIDRegexp.FindStringSubmatch(node.Spec.ProviderID)
if len(matches) != 2 {
id, ok := commonaws.ExtractEC2InstanceID(node)
if !ok {
log.Printf("Skipping node %s with unexpected provider ID: %s", node.Name, node.Spec.ProviderID)
return nil
}
pending[matches[1]] = pendingNode{labels: node.Labels, taints: node.Spec.Taints}
pending[id] = pendingNode{labels: node.Labels, taints: node.Spec.Taints}
if len(pending) >= ec2DescribeBatchSize {
return flush()
}
Expand Down
169 changes: 169 additions & 0 deletions cmd/kubectl-datadog/autoscaling/cluster/evict/asg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package evict

import (
"context"
"errors"
"fmt"
"log"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/autoscaling"
"k8s.io/client-go/kubernetes"

commonaws "github.com/DataDog/datadog-operator/cmd/kubectl-datadog/autoscaling/cluster/common/aws"
)

// AutoscalingAPI is the subset of *autoscaling.Client used by evictASG.
// Defined as an interface so unit tests can stub the AWS SDK out cheaply.
type AutoscalingAPI interface {
UpdateAutoScalingGroup(ctx context.Context, in *autoscaling.UpdateAutoScalingGroupInput, opts ...func(*autoscaling.Options)) (*autoscaling.UpdateAutoScalingGroupOutput, error)
SuspendProcesses(ctx context.Context, in *autoscaling.SuspendProcessesInput, opts ...func(*autoscaling.Options)) (*autoscaling.SuspendProcessesOutput, error)
TerminateInstanceInAutoScalingGroup(ctx context.Context, in *autoscaling.TerminateInstanceInAutoScalingGroupInput, opts ...func(*autoscaling.Options)) (*autoscaling.TerminateInstanceInAutoScalingGroupOutput, error)
}

// evictASG cordons every node in the ASG up front, then drains them,
// terminating each node's instance — and decrementing the ASG's desired
// capacity — as soon as that node has drained cleanly, so its capacity is
// freed without waiting for the rest of the group. Cordoning the whole group
// before any drain (as EKS does for a managed node group) keeps a pod evicted
// from one node from landing on a sibling node that is itself about to be
// drained. Once every node has drained, the ASG is locked at min=max=desired=0
// so nothing is ever relaunched.
//
// Safety rules:
//
// 1. An instance is only terminated once its node has drained cleanly. If a
// drain fails, that instance is left running (its pods are still on it)
// and the ASG is left at MinSize=0 with AZRebalance suspended but is NOT
// locked at MaxSize=0, so a re-run can pick up where this one stopped.
// 2. Per-instance termination requires two precautions, applied up front by
// prepareASGForTermination: (a) MinSize=0, because AWS rejects
// TerminateInstanceInAutoScalingGroup with ShouldDecrementDesiredCapacity
// while MinSize == DesiredCapacity; and (b) suspending the AZRebalance
// process, because decrementing desired capacity one instance at a time
// can otherwise trigger AZ rebalancing that terminates a not-yet-drained
// instance in another AZ.
// 3. Crash window: because instances are terminated before the final
// MaxSize=0 lock, a crash (or lost AWS connectivity) after the last
// termination but before the lock leaves the ASG at DesiredCapacity=0
// (so it will not relaunch on its own) but with its original MaxSize.
// A re-run cannot rediscover it — target discovery is driven by surviving
// Kubernetes nodes, which are now gone — so the operator must re-lock
// MaxSize manually if an external scaler might raise the desired capacity.
// In practice the command also scales cluster-autoscaler to zero, so
// nothing routinely raises it.
//
// We never delete the ASG: it may be managed by Terraform/CloudFormation/Helm,
// and only the original owner should remove it.
func evictASG(ctx context.Context, clientset kubernetes.Interface, asg AutoscalingAPI, asgName string, nodes []string, drainOpts nodeDrainOptions) error {
if drainOpts.DryRun {
log.Printf("[dry-run] would suspend AZRebalance and set MinSize=0 on ASG %s", asgName)
} else if err := prepareASGForTermination(ctx, asg, asgName); err != nil {
return fmt.Errorf("prepare ASG %s for termination: %w", asgName, err)
}

// Cordon every node up front so a pod evicted from one node is never
// rescheduled onto another node of the same ASG that is itself about to be
// drained. A node that fails to cordon is left undrained; treat that as a
// drain failure so the ASG keeps its original MaxSize for a re-run instead
// of being locked away with workloads still on it.
cordoned, errs := cordonNodes(ctx, clientset, nodes, drainOpts.DryRun)

for _, node := range cordoned {
nodeName := node.Name
id, hasInstanceID := commonaws.ExtractEC2InstanceID(node)
if !hasInstanceID {
log.Printf("Warning: node %s has unexpected providerID %q; its instance will be terminated by the final scale-to-zero instead", nodeName, node.Spec.ProviderID)
}
if err := drainNode(ctx, clientset, nodeName, drainOpts); err != nil {
errs = append(errs, fmt.Errorf("drain node %s: %w", nodeName, err))
continue // do NOT terminate this instance: workloads are still on it
}
// The node drained cleanly: terminate its instance now, decrementing
// the ASG's desired capacity so it is not relaunched. Nodes with an
// unexpected providerID are left for the final scale-to-zero.
if !hasInstanceID {
continue
}
if drainOpts.DryRun {
log.Printf("[dry-run] would terminate instance %s in ASG %s (decrementing desired capacity)", id, asgName)
continue
}
if err := terminateASGInstance(ctx, asg, id); err != nil {
errs = append(errs, fmt.Errorf("terminate instance %s in ASG %s: %w", id, asgName, err))
}
}

if len(errs) > 0 {
log.Printf("ASG %s: at least one node failed to cordon, drain, or terminate; leaving the ASG at MinSize=0 without locking MaxSize. Re-run after addressing the errors above.", asgName)
return errors.Join(errs...)
}

// Every node drained and its instance was terminated. Lock the ASG at
// min=max=desired=0 so nothing is ever relaunched, and to clean up any
// instance that couldn't be terminated per-node (unexpected providerID).
if drainOpts.DryRun {
log.Printf("[dry-run] would scale ASG %s to min=max=desired=0", asgName)
return nil
}
// All instances are now terminated; only the MaxSize=0 lock remains. A
// crash here would leave the ASG at desired=0 but unlocked and no longer
// rediscoverable by a re-run (see the crash-window note on evictASG).
log.Printf("ASG %s: all nodes drained; locking the ASG at min=max=desired=0.", asgName)
if err := scaleASGToZero(ctx, asg, asgName); err != nil {
errs = append(errs, fmt.Errorf("scale ASG %s to 0: %w", asgName, err))
}
return errors.Join(errs...)
}

// prepareASGForTermination makes the ASG safe for the per-instance termination
// performed during the drain loop:
//
// - AZRebalance is suspended so that decrementing desired capacity one
// instance at a time cannot trigger AZ rebalancing — which would terminate
// a not-yet-drained instance in another availability zone.
// - MinSize is set to 0 so that TerminateInstanceInAutoScalingGroup may
// decrement DesiredCapacity (AWS rejects the decrement while
// MinSize == DesiredCapacity).
func prepareASGForTermination(ctx context.Context, asg AutoscalingAPI, asgName string) error {
if _, err := asg.SuspendProcesses(ctx, &autoscaling.SuspendProcessesInput{
AutoScalingGroupName: awssdk.String(asgName),
ScalingProcesses: []string{"AZRebalance"},
}); err != nil {
return fmt.Errorf("suspend AZRebalance: %w", err)
}
if _, err := asg.UpdateAutoScalingGroup(ctx, &autoscaling.UpdateAutoScalingGroupInput{
AutoScalingGroupName: awssdk.String(asgName),
MinSize: awssdk.Int32(0),
}); err != nil {
return fmt.Errorf("set MinSize=0: %w", err)
}
log.Printf("Prepared ASG %s for termination (AZRebalance suspended, MinSize=0).", asgName)
return nil
}

// terminateASGInstance terminates a single (already drained) instance and
// decrements the ASG's desired capacity so it is not relaunched.
func terminateASGInstance(ctx context.Context, asg AutoscalingAPI, instanceID string) error {
if _, err := asg.TerminateInstanceInAutoScalingGroup(ctx, &autoscaling.TerminateInstanceInAutoScalingGroupInput{
InstanceId: awssdk.String(instanceID),
ShouldDecrementDesiredCapacity: awssdk.Bool(true),
}); err != nil {
return err
}
log.Printf("Terminated instance %s and decremented ASG desired capacity.", instanceID)
return nil
}

func scaleASGToZero(ctx context.Context, asg AutoscalingAPI, asgName string) error {
if _, err := asg.UpdateAutoScalingGroup(ctx, &autoscaling.UpdateAutoScalingGroupInput{
AutoScalingGroupName: awssdk.String(asgName),
MinSize: awssdk.Int32(0),
MaxSize: awssdk.Int32(0),
DesiredCapacity: awssdk.Int32(0),
}); err != nil {
return err
}
log.Printf("Scaled ASG %s to min=max=desired=0.", asgName)
return nil
}
Loading
Loading