Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions pkg/engine/patcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,27 @@ func (p *Patcher) ReconcileAsset(ctx context.Context, assetMeta *assets.AssetMet
return false, nil
}

// Step 1.6: Clear stale thrashing state when the user removes the pause annotation.
// If in-memory state already reached the threshold (meaning the operator previously
// set the pause annotation) but the annotation is now absent, the user intentionally
// resumed reconciliation. Reset the counter so the next cycle starts from zero:
// the token bucket may still be empty and cause throttles, but those start from 0
// and will not immediately re-trigger a pause.
if liveExists {
earlyKey := throttling.MakeResourceKey(
desired.GetNamespace(),
desired.GetName(),
desired.GetKind(),
)
if p.thrashingDetector.GetAttempts(earlyKey) >= throttling.ThrashingThreshold {
logger.Info("Pause annotation removed by user, resetting thrashing state",
"name", assetMeta.Name,
"key", earlyKey,
)
p.thrashingDetector.Reset(earlyKey)
}
}

// Step 2: Check opt-out annotation (mode: unmanaged)
if liveExists && overrides.IsUnmanaged(live) {
logger.V(1).Info("Asset is unmanaged, skipping",
Expand Down
97 changes: 97 additions & 0 deletions pkg/engine/patcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

pkgassets "github.com/kubevirt/virt-platform-autopilot/pkg/assets"
Expand Down Expand Up @@ -282,6 +283,102 @@ func TestThrashingEventEmittedOnlyOnce(t *testing.T) {
}
}

// TestThrashingStateResetOnPauseAnnotationRemoval reproduces the race where a user removes
// the reconcile-paused annotation before the token bucket has refilled (< 6 s after pause).
// Without the fix the in-memory consecutiveThrottles stays >= ThrashingThreshold, so the very
// next throttled reconciliation immediately re-pauses the resource.
// With the fix the thrashing state is cleared as soon as the pause annotation is absent,
// so subsequent throttles start from 0 and do not trigger a re-pause.
func TestThrashingStateResetOnPauseAnnotationRemoval(t *testing.T) {
loader := pkgassets.NewLoader()
renderer := NewRenderer(loader)

assetMeta := &pkgassets.AssetMetadata{
Name: "psi-enable",
Path: "active/machine-config/04-psi-enable.yaml",
Component: "MachineConfig",
}

hco := pkgcontext.NewMockHCO("kubevirt-hyperconverged", "kubevirt-hyperconverged")
renderCtx := pkgcontext.NewRenderContext(hco)

desired, err := renderer.RenderAsset(assetMeta, renderCtx)
if err != nil {
t.Fatalf("failed to render asset: %v", err)
}

// Bucket: capacity 1, very long window so no natural refill during the test.
// Call 1 → token consumed; calls 2+ → throttled.
// After ThrashingThreshold throttles the operator sets the pause annotation.
smallBucket := throttling.NewTokenBucketWithSettings(1, time.Hour)
td := throttling.NewThrashingDetector()

live := desired.DeepCopy()
fakeClient := fake.NewClientBuilder().WithObjects(live).Build()

p := &Patcher{
renderer: renderer,
applier: NewApplier(fakeClient, nil),
driftDetector: &alwaysDriftChecker{},
throttle: smallBucket,
thrashingDetector: td,
client: fakeClient,
}

// Drive the patcher until it sets the pause annotation.
// Call 1 consumes the token; calls 2, 3, 4 are throttled and hit the threshold.
for i := 0; i < throttling.ThrashingThreshold+1; i++ {
//nolint:errcheck // we only care about side-effects here
p.ReconcileAsset(context.Background(), assetMeta, renderCtx)
}

// Verify the pause annotation was set on the live object.
paused := &unstructured.Unstructured{}
paused.SetGroupVersionKind(live.GroupVersionKind())
if err := fakeClient.Get(context.Background(), client.ObjectKey{
Namespace: live.GetNamespace(),
Name: live.GetName(),
}, paused); err != nil {
t.Fatalf("failed to fetch live object: %v", err)
}
annotations := paused.GetAnnotations()
if annotations == nil || annotations["platform.kubevirt.io/reconcile-paused"] != "true" {
t.Fatal("expected pause annotation to be set after edit-war threshold; was not set")
}

// Simulate user removing the pause annotation (bucket still empty — the race window).
delete(annotations, "platform.kubevirt.io/reconcile-paused")
paused.SetAnnotations(annotations)
if err := fakeClient.Update(context.Background(), paused); err != nil {
t.Fatalf("failed to remove pause annotation: %v", err)
}

// One more reconciliation while the bucket is still empty.
// Before the fix: consecutiveThrottles was still >= threshold → immediate re-pause.
// After the fix: thrashing state is cleared → throttle count resets to 1 → no re-pause.
_, reconcileErr := p.ReconcileAsset(context.Background(), assetMeta, renderCtx)

// The call must return a throttle error (not a pause error) and must NOT re-set the annotation.
if reconcileErr == nil {
t.Fatal("expected a throttle error (bucket still empty) but got nil")
}
if strings.Contains(reconcileErr.Error(), "edit war") {
t.Errorf("got pause-re-trigger error %q; want a plain throttle error — thrashing state was not reset", reconcileErr)
}

afterRemoval := &unstructured.Unstructured{}
afterRemoval.SetGroupVersionKind(live.GroupVersionKind())
if err := fakeClient.Get(context.Background(), client.ObjectKey{
Namespace: live.GetNamespace(),
Name: live.GetName(),
}, afterRemoval); err != nil {
t.Fatalf("failed to fetch live object after annotation removal: %v", err)
}
if ann := afterRemoval.GetAnnotations(); ann != nil && ann["platform.kubevirt.io/reconcile-paused"] == "true" {
t.Error("resource was immediately re-paused after user removed the annotation; thrashing state was not cleared")
}
}

func TestCountJSONPatchOperations(t *testing.T) {
tests := []struct {
name string
Expand Down