Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions crates/stackable-operator/crds/Scaler.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: scalers.autoscaling.stackable.tech
spec:
group: autoscaling.stackable.tech
names:
categories: []
kind: Scaler
plural: scalers
shortNames: []
singular: scaler
scope: Namespaced
versions:
- additionalPrinterColumns: []
name: v1alpha1
schema:
openAPIV3Schema:
description: Auto-generated derived type for ScalerSpec via `CustomResource`
properties:
spec:
properties:
replicas:
description: |-
Desired replica count.

Written by the horizontal pod autoscaling mechanism via the /scale subresource.

NOTE: This and other replica fields)use a [`u16`] instead of a [`i32`] used by
[`k8s_openapi`] types to force a non-negative replica count. All [`u16`]s can be
converted losslessly to [`i32`]s where needed.

Upstream issues:

- https://github.com/kubernetes/kubernetes/issues/105533
- https://github.com/Arnavion/k8s-openapi/issues/136
format: uint16
maximum: 65535.0
minimum: 0.0
type: integer
required:
- replicas
type: object
status:
description: Status of a StackableScaler.
nullable: true
properties:
lastTransitionTime:
description: Timestamp indicating when the scaler state last transitioned.
format: date-time
type: string
replicas:
description: |-
The current total number of replicas targeted by the managed StatefulSet.

Exposed via the `/scale` subresource for horizontal pod autoscaling consumption.
format: uint16
maximum: 65535.0
minimum: 0.0
type: integer
selector:
description: Label selector string for HPA pod counting. Written at `.status.selector`.
nullable: true
type: string
state:
description: The current state of the scaler state machine.
properties:
details:
properties:
failedIn:
description: In which state the scaling operation failed.
enum:
- preScaling
- scaling
- postScaling
type: string
previous_replicas:
maximum: 65535.0
minimum: 0.0
type: uint16
reason:
type: string
type: object
state:
enum:
- idle
- preScaling
- scaling
- postScaling
- failed
type: string
required:
- state
type: object
required:
- replicas
- state
- lastTransitionTime
type: object
required:
- spec
title: Scaler
type: object
served: true
storage: true
subresources:
scale:
labelSelectorPath: .status.selector
specReplicasPath: .spec.replicas
statusReplicasPath: .status.replicas
status: {}
1 change: 1 addition & 0 deletions crates/stackable-operator/src/crd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod authentication;
pub mod git_sync;
pub mod listener;
pub mod s3;
pub mod scaler;

/// A reference to a product cluster (for example, a `ZookeeperCluster`)
///
Expand Down
154 changes: 154 additions & 0 deletions crates/stackable-operator/src/crd/scaler/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
use std::borrow::Cow;

use k8s_openapi::apimachinery::pkg::apis::meta::v1::Time;
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

#[cfg(doc)]
use crate::kvp::Annotation;
use crate::versioned::versioned;

#[versioned(version(name = "v1alpha1"))]
pub mod versioned {
#[versioned(crd(
group = "autoscaling.stackable.tech",
status = ScalerStatus,
scale(
spec_replicas_path = ".spec.replicas",
status_replicas_path = ".status.replicas",
label_selector_path = ".status.selector"
),
namespaced
))]
#[derive(Clone, Debug, PartialEq, CustomResource, Deserialize, Serialize, JsonSchema)]
pub struct ScalerSpec {
/// Desired replica count.
///
/// Written by the horizontal pod autoscaling mechanism via the /scale subresource.
///
/// NOTE: This and other replica fields)use a [`u16`] instead of a [`i32`] used by
/// [`k8s_openapi`] types to force a non-negative replica count. All [`u16`]s can be
/// converted losslessly to [`i32`]s where needed.
///
/// Upstream issues:
///
/// - https://github.com/kubernetes/kubernetes/issues/105533
/// - https://github.com/Arnavion/k8s-openapi/issues/136
pub replicas: u16,
}
}

/// Status of a StackableScaler.
#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct ScalerStatus {
/// The current total number of replicas targeted by the managed StatefulSet.
///
/// Exposed via the `/scale` subresource for horizontal pod autoscaling consumption.
pub replicas: u16,

/// Label selector string for HPA pod counting. Written at `.status.selector`.
#[serde(skip_serializing_if = "Option::is_none")]
pub selector: Option<String>,

/// The current state of the scaler state machine.
pub state: ScalerState,

/// Timestamp indicating when the scaler state last transitioned.
pub last_transition_time: Time,
}

// We use `#[serde(tag)]` and `#[serde(content)]` here to circumvent Kubernetes restrictions in their
// structural schema subset of OpenAPI schemas. They don't allow one variant to be typed as a string
// and others to be typed as objects. We therefore encode the variant data in a separate details
// key/object. With this, all variants can be encoded as strings, while the status can still contain
// additional data in an extra field when needed.
#[derive(Clone, Debug, Deserialize, Serialize, strum::Display)]
#[serde(
tag = "state",
content = "details",
rename_all = "camelCase",
rename_all_fields = "camelCase"
)]
#[strum(serialize_all = "camelCase")]
pub enum ScalerState {
/// No scaling operation is in progress.
Idle,

/// Running the `pre_scale` hook (e.g. data offload).
PreScaling,

/// Waiting for the StatefulSet to converge to the new replica count.
///
/// This stage additionally tracks the previous replica count to be able derive the direction
/// of the scaling operation.
Scaling { previous_replicas: u16 },

/// Running the `post_scale` hook (e.g. cluster rebalance).
///
/// This stage additionally tracks the previous replica count to be able derive the direction
/// of the scaling operation.
PostScaling { previous_replicas: u16 },

/// A hook returned an error.
///
/// The scaler stays here until the user applies the [`Annotation::autoscaling_retry`] annotation
/// to trigger a reset to [`ScalerState::Idle`].
Failed {
/// Which stage produced the error.
failed_in: FailedInState,

/// Human-readable error message from the hook.
reason: String,
},
}

// We manually implement the JSON schema instead of deriving it, because kube's schema transformer
// cannot handle the derived JsonSchema and proceeds to hit the following error: "Property "state"
// has the schema ... but was already defined as ... in another subschema. The schemas for a
// property used in multiple subschemas must be identical".
impl JsonSchema for ScalerState {
fn schema_name() -> Cow<'static, str> {
"ScalerState".into()
}

fn json_schema(generator: &mut schemars::generate::SchemaGenerator) -> schemars::Schema {
schemars::json_schema!({
"type": "object",
"required": ["state"],
"properties": {
"state": {
"type": "string",
"enum": ["idle", "preScaling", "scaling", "postScaling", "failed"]
},
"details": {
"type": "object",
"properties": {
"failedIn": generator.subschema_for::<FailedInState>(),
"previous_replicas": {
"type": "uint16",
"minimum": u16::MIN,
"maximum": u16::MAX
},
"reason": { "type": "string" }
}
}
}
})
}
}

/// In which state the scaling operation failed.
#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub enum FailedInState {
/// The `pre_scale` hook returned an error.
PreScaling,

/// The StatefulSet failed to reach the desired replica count.
Scaling,

/// The `post_scale` hook returned an error.
PostScaling,
}
10 changes: 10 additions & 0 deletions crates/stackable-operator/src/kvp/annotation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,16 @@ impl Annotation {
))?;
Ok(Self(kvp))
}

/// Constructs a `autoscaling.stackable.tech/retry` annotation.
pub fn autoscaling_retry(retry: bool) -> Self {
// SAFETY: We use expect here, because the input parameter can only be one of two possible
// values: true or false. This fact in combination with the known annotation key length
// allows use to use expect here, instead of bubbling up the error.
let kvp = KeyValuePair::try_from(("autoscaling.stackable.tech/retry", retry.to_string()))
.expect("autoscaling retry annotation must be valid");
Self(kvp)
}
}

/// A validated set/list of Kubernetes annotations.
Expand Down
2 changes: 2 additions & 0 deletions crates/xtask/src/crd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use stackable_operator::{
PodListenersVersion,
},
s3::{S3Bucket, S3BucketVersion, S3Connection, S3ConnectionVersion},
scaler::{Scaler, ScalerVersion},
},
kube::core::crd::MergeError,
};
Expand Down Expand Up @@ -77,6 +78,7 @@ pub fn generate_preview() -> Result<(), Error> {
write_crd!(path, PodListeners, V1Alpha1);
write_crd!(path, S3Bucket, V1Alpha1);
write_crd!(path, S3Connection, V1Alpha1);
write_crd!(path, Scaler, V1Alpha1);

write_crd!(path, DummyCluster, V1Alpha1);

Expand Down
Loading