Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 55 additions & 6 deletions api/v1alpha1/workerdeployment_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,22 @@ type WorkerDeploymentSpec struct {
// How to rollout new workflow executions to the target version.
RolloutStrategy RolloutStrategy `json:"rollout"`

// How to rollback to a previous version. If not specified, defaults to AllAtOnce strategy.
//
// A rollback is triggered automatically when the target version's pod spec is updated and
// the resulting build ID has previously been set as the default (current) version of the
// worker deployment. The controller detects this by checking whether Temporal recorded a
// non-nil LastCurrentTime for that build ID.
//
// The rollback strategy controls routing of NEW workflow executions only. Workflows already
// running are pinned to the version they started on and continue executing there; they are
// not affected by the rollback. Only new workflow executions will be routed to the rollback
// target version.
//
// Rollback is suppressed when the rollout strategy is Manual.
// +optional
RollbackStrategy *RollbackStrategy `json:"rollback,omitempty"`
Comment thread
eniko-dif marked this conversation as resolved.

// How to manage sunsetting drained versions.
SunsetStrategy SunsetStrategy `json:"sunset"`

Expand Down Expand Up @@ -338,17 +354,17 @@ type DeprecatedWorkerDeploymentVersion struct {
EligibleForDeletion bool `json:"eligibleForDeletion,omitempty"`
}

// DefaultVersionUpdateStrategy describes how to cut over new workflow executions
// VersionRolloutStrategy describes how to cut over new workflow executions
// to the target worker deployment version.
// +kubebuilder:validation:Enum=Manual;AllAtOnce;Progressive
type DefaultVersionUpdateStrategy string
type VersionRolloutStrategy string

const (
// UpdateManual scales worker resources up or down, but does not update the current or ramping worker deployment version.
UpdateManual DefaultVersionUpdateStrategy = "Manual"
UpdateManual VersionRolloutStrategy = "Manual"

// UpdateAllAtOnce starts 100% of new workflow executions on the new worker deployment version as soon as it's healthy.
UpdateAllAtOnce DefaultVersionUpdateStrategy = "AllAtOnce"
UpdateAllAtOnce VersionRolloutStrategy = "AllAtOnce"

// UpdateProgressive ramps up the percentage of new workflow executions targeting the new worker deployment version over time.
//
Expand All @@ -358,7 +374,19 @@ const (
// Sending a percentage of traffic to a "nil" version means that traffic will be sent to unversioned workers. If
// there are no unversioned workers, those tasks will get stuck. This behavior ensures that all traffic on the task
// queues in this worker deployment can be handled by an active poller.
UpdateProgressive DefaultVersionUpdateStrategy = "Progressive"
UpdateProgressive VersionRolloutStrategy = "Progressive"
)

// VersionRollbackStrategy describes how to cut over during rollback to a previous version.
// +kubebuilder:validation:Enum=AllAtOnce;Progressive
type VersionRollbackStrategy string

const (
// RollbackAllAtOnce immediately switches 100% of traffic back to the previous version.
RollbackAllAtOnce VersionRollbackStrategy = "AllAtOnce"

// RollbackProgressive gradually ramps traffic back to the previous version.
RollbackProgressive VersionRollbackStrategy = "Progressive"
)

type GateWorkflowConfig struct {
Expand Down Expand Up @@ -393,7 +421,7 @@ type RolloutStrategy struct {
// - "Manual"
// - "AllAtOnce"
// - "Progressive"
Strategy DefaultVersionUpdateStrategy `json:"strategy"`
Strategy VersionRolloutStrategy `json:"strategy"`

// Gate specifies a workflow type that must run once to completion on the new worker deployment version before
// any traffic is directed to the new version.
Expand All @@ -405,6 +433,27 @@ type RolloutStrategy struct {
Steps []RolloutStep `json:"steps,omitempty" protobuf:"bytes,3,rep,name=steps"`
}

// RollbackStrategy defines strategy to apply when rolling back to a previous version.
// This is separate from RolloutStrategy because rollbacks have different requirements:
// - No gate workflow (already trusted version)
// - No manual mode (rollbacks should be automatic)
// - Default to AllAtOnce for fast recovery
type RollbackStrategy struct {
Comment thread
eniko-dif marked this conversation as resolved.
// Strategy for rollback. Valid values are "AllAtOnce" or "Progressive".
// Defaults to "AllAtOnce" for fast recovery.
Strategy VersionRollbackStrategy `json:"strategy"`

// Steps to execute progressive rollbacks. Only required when strategy is "Progressive".
// +optional
Steps []RolloutStep `json:"steps,omitempty"`
Comment thread
eniko-dif marked this conversation as resolved.

// MaxVersionAge limits which versions are eligible as rollback targets.
// A version is only considered a rollback target if it was last current within this duration.
// If nil, there is no age limit.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default applied in the webhook is 1h though, so this comment doesn't match current behavior. We should also document the meaning of setting to 0.

Right now I don't think there is a way to achieve the "no age limit" behavior, beyond setting a very high value? (I think this is fine, but just clarifying since it seems like you can either specify a limit or disable rollback entirely by setting to 0).

// +optional
MaxVersionAge *metav1.Duration `json:"maxVersionAge,omitempty"`
}

// SunsetStrategy defines strategy to apply when sunsetting k8s deployments of drained versions.
type SunsetStrategy struct {
// ScaledownDelay specifies how long to wait after a version is drained before scaling its Deployment to zero.
Expand Down
89 changes: 79 additions & 10 deletions api/v1alpha1/workerdeployment_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package v1alpha1
import (
"context"
"fmt"
"time"

"github.com/temporalio/temporal-worker-controller/internal/defaults"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -53,6 +54,14 @@ func (s *WorkerDeploymentSpec) Default(ctx context.Context) error {
s.SunsetStrategy.DeleteDelay = &v1.Duration{Duration: defaults.DeleteDelay}
}

if s.RollbackStrategy == nil {
s.RollbackStrategy = &RollbackStrategy{Strategy: RollbackAllAtOnce}
Comment thread
jlegrone marked this conversation as resolved.
} else if s.RollbackStrategy.Strategy == "" {
s.RollbackStrategy.Strategy = RollbackAllAtOnce
}
Comment on lines +57 to +61
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this is entirely a matter of taste but I think two if statements is easier to read than the if/else flow (more clear that the strategy is always all at once if left unset).

Suggested change
if s.RollbackStrategy == nil {
s.RollbackStrategy = &RollbackStrategy{Strategy: RollbackAllAtOnce}
} else if s.RollbackStrategy.Strategy == "" {
s.RollbackStrategy.Strategy = RollbackAllAtOnce
}
if s.RollbackStrategy == nil {
s.RollbackStrategy = &RollbackStrategy{}
}
if s.RollbackStrategy.Strategy == "" {
s.RollbackStrategy.Strategy = RollbackAllAtOnce
}

if s.RollbackStrategy.MaxVersionAge == nil {
s.RollbackStrategy.MaxVersionAge = &v1.Duration{Duration: defaults.RollbackMaxVersionAge}
}
return nil
}

Expand Down Expand Up @@ -81,10 +90,17 @@ func (r *WorkerDeployment) validateForUpdateOrCreate(ctx context.Context, obj ru
}

func validateForUpdateOrCreate(old, new *WorkerDeployment) (admission.Warnings, error) {
allErrs := validateRolloutStrategy(new.Spec.RolloutStrategy)
var allErrs field.ErrorList
allErrs = append(allErrs, validateRolloutStrategy(new.Spec.RolloutStrategy)...)
if new.Spec.RollbackStrategy != nil {
allErrs = append(allErrs, validateRollbackStrategy(*new.Spec.RollbackStrategy)...)
}
if len(allErrs) > 0 {
return nil, newInvalidErr(new, allErrs)
}
if new.Spec.RollbackStrategy != nil {
return warnRollbackSlowerThanRollout(new.Spec.RolloutStrategy, *new.Spec.RollbackStrategy), nil
}
return nil, nil
}

Expand All @@ -96,15 +112,7 @@ func validateRolloutStrategy(s RolloutStrategy) []*field.Error {
var allErrs []*field.Error

if s.Strategy == UpdateProgressive {
var lastRamp int
for i, step := range s.Steps {
if step.RampPercentage <= lastRamp {
allErrs = append(allErrs,
field.Invalid(field.NewPath(fmt.Sprintf("spec.rollout.steps[%d].rampPercentage", i)), step.RampPercentage, "rampPercentage must increase between each step"),
)
}
lastRamp = step.RampPercentage
}
allErrs = append(allErrs, validateProgressiveStrategySteps("spec.rollout.steps", s.Steps)...)
}

if s.Gate != nil && s.Gate.Input != nil && s.Gate.InputFrom != nil {
Expand All @@ -117,6 +125,67 @@ func validateRolloutStrategy(s RolloutStrategy) []*field.Error {
return allErrs
}

func validateRollbackStrategy(s RollbackStrategy) []*field.Error {
var allErrs []*field.Error
if s.Strategy == RollbackProgressive {
allErrs = append(allErrs, validateProgressiveStrategySteps("spec.rollback.steps", s.Steps)...)
}
return allErrs
}

func warnRollbackSlowerThanRollout(rollout RolloutStrategy, rollback RollbackStrategy) admission.Warnings {
switch rollout.Strategy {
case UpdateAllAtOnce:
if rollback.Strategy != RollbackAllAtOnce {
return admission.Warnings{"rollback strategy is slower than rollout: rollout is AllAtOnce, but rollback is Progressive — is that intended?"}
}
case UpdateProgressive:
if rollback.Strategy == RollbackProgressive {
var rolloutTotal, rollbackTotal time.Duration
for _, s := range rollout.Steps {
rolloutTotal += s.PauseDuration.Duration
}
for _, s := range rollback.Steps {
rollbackTotal += s.PauseDuration.Duration
}
if rollbackTotal > rolloutTotal {
return admission.Warnings{fmt.Sprintf(
"rollback strategy is slower than rollout: progressive rollback total duration (%s) exceeds progressive rollout total duration (%s) — is that intended?",
rollbackTotal, rolloutTotal,
)}
}
}
}
return nil
}

func validateProgressiveStrategySteps(specName string, steps []RolloutStep) []*field.Error {
var allErrs []*field.Error

if len(steps) == 0 {
allErrs = append(allErrs,
field.Invalid(field.NewPath(specName), steps, "steps are required for Progressive strategy"),
)
}

var lastRamp int
for i, step := range steps {
if step.PauseDuration.Duration < 30*time.Second {
allErrs = append(allErrs,
field.Invalid(field.NewPath(fmt.Sprintf("%s[%d].pauseDuration", specName, i)), step.PauseDuration.Duration.String(), "pause duration must be at least 30s"),
)
}
if step.RampPercentage <= lastRamp {
allErrs = append(allErrs,
field.Invalid(field.NewPath(fmt.Sprintf("%s[%d].rampPercentage", specName, i)), step.RampPercentage, "rampPercentage must increase between each step"),
)
}
lastRamp = step.RampPercentage
}

return allErrs
}

func newInvalidErr(dep *WorkerDeployment, errs field.ErrorList) *apierrors.StatusError {
return apierrors.NewInvalid(dep.GroupVersionKind().GroupKind(), dep.GetName(), errs)
}
Loading