Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sagemaker-core/src/sagemaker/core/helper/pipeline_variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,7 @@ def __get_pydantic_core_schema__(cls, source_type, handler):

# This is a type that could be either string or pipeline variable
StrPipeVar = Union[str, PipelineVariable]
# This is a type that could be either integer or pipeline variable
IntPipeVar = Union[int, PipelineVariable]
# This is a type that could be either boolean or pipeline variable
BoolPipeVar = Union[bool, PipelineVariable]
8 changes: 4 additions & 4 deletions sagemaker-core/src/sagemaker/core/shapes/shapes.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from pydantic import BaseModel, ConfigDict, Field
from typing import List, Dict, Optional, Any, Union
from sagemaker.core.utils.utils import Unassigned
from sagemaker.core.helper.pipeline_variable import StrPipeVar
from sagemaker.core.helper.pipeline_variable import StrPipeVar, IntPipeVar, BoolPipeVar

# Suppress Pydantic warnings about field names shadowing parent attributes
warnings.filterwarnings("ignore", message=".*shadows an attribute.*")
Expand Down Expand Up @@ -1324,10 +1324,10 @@ class ResourceConfig(Base):
"""

instance_type: Optional[StrPipeVar] = Unassigned()
instance_count: Optional[int] = Unassigned()
volume_size_in_gb: Optional[int] = Unassigned()
instance_count: Optional[IntPipeVar] = Unassigned()
volume_size_in_gb: Optional[IntPipeVar] = Unassigned()
volume_kms_key_id: Optional[StrPipeVar] = Unassigned()
keep_alive_period_in_seconds: Optional[int] = Unassigned()
keep_alive_period_in_seconds: Optional[IntPipeVar] = Unassigned()
capacity_reservation_ids: Optional[List[StrPipeVar]] = Unassigned()
instance_groups: Optional[List[InstanceGroup]] = Unassigned()
capacity_schedules_config: Optional[CapacitySchedulesConfig] = Unassigned()
Expand Down
10 changes: 5 additions & 5 deletions sagemaker-core/src/sagemaker/core/training/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from pydantic import BaseModel, model_validator, ConfigDict

import sagemaker.core.shapes as shapes
from sagemaker.core.helper.pipeline_variable import StrPipeVar
from sagemaker.core.helper.pipeline_variable import StrPipeVar, IntPipeVar, BoolPipeVar

# TODO: Can we add custom logic to some of these to set better defaults?
from sagemaker.core.shapes import (
Expand Down Expand Up @@ -158,23 +158,23 @@ class Compute(shapes.ResourceConfig):
instance_type (Optional[StrPipeVar]):
The ML compute instance type. For information about available instance types,
see https://aws.amazon.com/sagemaker/pricing/.
instance_count (Optional[int]): The number of ML compute instances to use. For distributed
instance_count (Optional[IntPipeVar]): The number of ML compute instances to use. For distributed
training, provide a value greater than 1.
volume_size_in_gb (Optional[int]):
volume_size_in_gb (Optional[IntPipeVar]):
The size of the ML storage volume that you want to provision. ML storage volumes store
model artifacts and incremental states. Training algorithms might also use the ML
storage volume for scratch space. Default: 30
volume_kms_key_id (Optional[StrPipeVar]):
The Amazon Web Services KMS key that SageMaker uses to encrypt data on the storage
volume attached to the ML compute instance(s) that run the training job.
keep_alive_period_in_seconds (Optional[int]):
keep_alive_period_in_seconds (Optional[IntPipeVar]):
The duration of time in seconds to retain configured resources in a warm pool for
subsequent training jobs.
instance_groups (Optional[List[InstanceGroup]]):
A list of instance groups for heterogeneous clusters to be used in the training job.
training_plan_arn (Optional[StrPipeVar]):
The Amazon Resource Name (ARN) of the training plan to use for this resource configuration.
enable_managed_spot_training (Optional[bool]):
enable_managed_spot_training (Optional[BoolPipeVar]):
To train models using managed spot training, choose True. Managed spot training
provides a fully managed and scalable infrastructure for training machine learning
models. this option is useful when training jobs can be interrupted and when there
Expand Down
119 changes: 70 additions & 49 deletions sagemaker-mlops/tests/integ/test_pipeline_train_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@
from sagemaker.train import ModelTrainer
from sagemaker.train.configs import InputData, Compute
from sagemaker.core.processing import ScriptProcessor
from sagemaker.core.shapes import ProcessingInput, ProcessingS3Input, ProcessingOutput, ProcessingS3Output
from sagemaker.core.shapes import (
ProcessingInput,
ProcessingS3Input,
ProcessingOutput,
ProcessingS3Output,
)
from sagemaker.serve.model_builder import ModelBuilder
from sagemaker.core.workflow.parameters import ParameterInteger, ParameterString
from sagemaker.mlops.workflow.pipeline import Pipeline
Expand Down Expand Up @@ -37,22 +42,27 @@ def test_pipeline_with_train_and_registry(sagemaker_session, pipeline_session, r
bucket = sagemaker_session.default_bucket()
prefix = "integ-test-v3-pipeline"
base_job_prefix = "train-registry-job"

# Upload abalone data to S3
s3_client = boto3.client('s3')
s3_client = boto3.client("s3")
abalone_path = os.path.join(os.path.dirname(__file__), "data", "pipeline", "abalone.csv")
s3_client.upload_file(abalone_path, bucket, f"{prefix}/input/abalone.csv")
input_data_s3 = f"s3://{bucket}/{prefix}/input/abalone.csv"

# Parameters
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
training_instance_count = ParameterInteger(name="TrainingInstanceCount", default_value=1)
instance_type = ParameterString(name="InstanceType", default_value="ml.m5.xlarge")
input_data = ParameterString(
name="InputDataUrl",
default_value=input_data_s3,
)

hyper_parameter_objective = ParameterString(
name="TrainingObjective", default_value="reg:linear"
)

cache_config = CacheConfig(enable_caching=True, expire_after="30d")

# Processing step
sklearn_processor = ScriptProcessor(
image_uri=image_uris.retrieve(
Expand All @@ -62,13 +72,13 @@ def test_pipeline_with_train_and_registry(sagemaker_session, pipeline_session, r
py_version="py3",
instance_type="ml.m5.xlarge",
),
instance_type="ml.m5.xlarge",
instance_type=instance_type,
instance_count=processing_instance_count,
base_job_name=f"{base_job_prefix}-sklearn",
sagemaker_session=pipeline_session,
role=role,
)

processor_args = sklearn_processor.run(
inputs=[
ProcessingInput(
Expand All @@ -79,7 +89,7 @@ def test_pipeline_with_train_and_registry(sagemaker_session, pipeline_session, r
s3_data_type="S3Prefix",
s3_input_mode="File",
s3_data_distribution_type="ShardedByS3Key",
)
),
)
],
outputs=[
Expand All @@ -88,36 +98,36 @@ def test_pipeline_with_train_and_registry(sagemaker_session, pipeline_session, r
s3_output=ProcessingS3Output(
s3_uri=f"s3://{sagemaker_session.default_bucket()}/{prefix}/train",
local_path="/opt/ml/processing/train",
s3_upload_mode="EndOfJob"
)
s3_upload_mode="EndOfJob",
),
),
ProcessingOutput(
output_name="validation",
s3_output=ProcessingS3Output(
s3_uri=f"s3://{sagemaker_session.default_bucket()}/{prefix}/validation",
local_path="/opt/ml/processing/validation",
s3_upload_mode="EndOfJob"
)
s3_upload_mode="EndOfJob",
),
),
ProcessingOutput(
output_name="test",
s3_output=ProcessingS3Output(
s3_uri=f"s3://{sagemaker_session.default_bucket()}/{prefix}/test",
local_path="/opt/ml/processing/test",
s3_upload_mode="EndOfJob"
)
s3_upload_mode="EndOfJob",
),
),
],
code=os.path.join(os.path.dirname(__file__), "code", "pipeline", "preprocess.py"),
arguments=["--input-data", input_data],
)

step_process = ProcessingStep(
name="PreprocessData",
step_args=processor_args,
cache_config=cache_config,
)

# Training step
image_uri = image_uris.retrieve(
framework="xgboost",
Expand All @@ -126,47 +136,46 @@ def test_pipeline_with_train_and_registry(sagemaker_session, pipeline_session, r
py_version="py3",
instance_type="ml.m5.xlarge",
)

model_trainer = ModelTrainer(
training_image=image_uri,
compute=Compute(instance_type="ml.m5.xlarge", instance_count=1),
compute=Compute(instance_type=instance_type, instance_count=training_instance_count),
base_job_name=f"{base_job_prefix}-xgboost",
sagemaker_session=pipeline_session,
role=role,
hyperparameters={
"objective": "reg:linear",
"objective": hyper_parameter_objective,
"num_round": 50,
"max_depth": 5,
},
input_data_config=[
InputData(
channel_name="train",
data_source=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
content_type="text/csv"
data_source=step_process.properties.ProcessingOutputConfig.Outputs[
"train"
].S3Output.S3Uri,
content_type="text/csv",
),
],
)

train_args = model_trainer.train()
step_train = TrainingStep(
name="TrainModel",
step_args=train_args,
cache_config=cache_config,
)

# Model step
model_builder = ModelBuilder(
s3_model_data_url=step_train.properties.ModelArtifacts.S3ModelArtifacts,
image_uri=image_uri,
sagemaker_session=pipeline_session,
role_arn=role,
)

step_create_model = ModelStep(
name="CreateModel",
step_args=model_builder.build()
)


step_create_model = ModelStep(name="CreateModel", step_args=model_builder.build())

# Register step
model_package_group_name = f"integ-test-model-group-{uuid.uuid4().hex[:8]}"
step_register_model = ModelStep(
Expand All @@ -176,33 +185,39 @@ def test_pipeline_with_train_and_registry(sagemaker_session, pipeline_session, r
content_types=["application/json"],
response_types=["application/json"],
inference_instances=["ml.m5.xlarge"],
approval_status="Approved"
)
approval_status="Approved",
),
)

# Pipeline
pipeline_name = f"integ-test-train-registry-{uuid.uuid4().hex[:8]}"
pipeline = Pipeline(
name=pipeline_name,
parameters=[processing_instance_count, input_data],
parameters=[
processing_instance_count,
training_instance_count,
instance_type,
input_data,
hyper_parameter_objective,
],
steps=[step_process, step_train, step_create_model, step_register_model],
sagemaker_session=pipeline_session,
)

model_name = None
try:
# Upsert and execute pipeline
pipeline.upsert(role_arn=role)
execution = pipeline.start()

# Poll execution status with 30 minute timeout
timeout = 1800
start_time = time.time()

while time.time() - start_time < timeout:
execution_desc = execution.describe()
execution_status = execution_desc["PipelineExecutionStatus"]

if execution_status == "Succeeded":
# Get model name from execution steps
steps = sagemaker_session.sagemaker_client.list_pipeline_execution_steps(
Expand All @@ -219,41 +234,47 @@ def test_pipeline_with_train_and_registry(sagemaker_session, pipeline_session, r
steps = sagemaker_session.sagemaker_client.list_pipeline_execution_steps(
PipelineExecutionArn=execution_desc["PipelineExecutionArn"]
)["PipelineExecutionSteps"]

failed_steps = []
for step in steps:
if step.get("StepStatus") == "Failed":
failure_reason = step.get("FailureReason", "Unknown reason")
failed_steps.append(f"{step['StepName']}: {failure_reason}")

failure_details = "\n".join(failed_steps) if failed_steps else "No detailed failure information available"
pytest.fail(f"Pipeline execution {execution_status}. Failed steps:\n{failure_details}")


failure_details = (
"\n".join(failed_steps)
if failed_steps
else "No detailed failure information available"
)
pytest.fail(
f"Pipeline execution {execution_status}. Failed steps:\n{failure_details}"
)

time.sleep(60)
else:
pytest.fail(f"Pipeline execution timed out after {timeout} seconds")

finally:
# Cleanup S3 resources
s3 = boto3.resource('s3')
s3 = boto3.resource("s3")
bucket_obj = s3.Bucket(bucket)
bucket_obj.objects.filter(Prefix=f'{prefix}/').delete()
bucket_obj.objects.filter(Prefix=f"{prefix}/").delete()

# Cleanup model
if model_name:
try:
sagemaker_session.sagemaker_client.delete_model(ModelName=model_name)
except Exception:
pass

# Cleanup model package group
try:
sagemaker_session.sagemaker_client.delete_model_package_group(
ModelPackageGroupName=model_package_group_name
)
except Exception:
pass

# Cleanup pipeline
try:
sagemaker_session.sagemaker_client.delete_pipeline(PipelineName=pipeline_name)
Expand Down
8 changes: 6 additions & 2 deletions sagemaker-train/src/sagemaker/train/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from sagemaker.core.helper.session_helper import Session
from sagemaker.core.shapes import Unassigned
from sagemaker.train import logger
from sagemaker.core.workflow.parameters import PipelineVariable


def _default_bucket_and_prefix(session: Session) -> str:
Expand Down Expand Up @@ -172,9 +173,10 @@ def safe_serialize(data):

This function handles the following cases:
1. If `data` is a string, it returns the string as-is without wrapping in quotes.
2. If `data` is serializable (e.g., a dictionary, list, int, float), it returns
2. If `data` is of type `PipelineVariable`, it returns the json representation of the PipelineVariable
3. If `data` is serializable (e.g., a dictionary, list, int, float), it returns
the JSON-encoded string using `json.dumps()`.
3. If `data` cannot be serialized (e.g., a custom object), it returns the string
4. If `data` cannot be serialized (e.g., a custom object), it returns the string
representation of the data using `str(data)`.

Args:
Expand All @@ -185,6 +187,8 @@ def safe_serialize(data):
"""
if isinstance(data, str):
return data
elif isinstance(data, PipelineVariable):
return data
try:
return json.dumps(data)
except TypeError:
Expand Down
Loading
Loading