Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 153 additions & 0 deletions clients/aws-sdk-transcribe-streaming/tests/integration/conftest.py
Comment thread
Alan4506 marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

"""Pytest fixtures for Transcribe Streaming integration tests.

Creates and tears down an IAM role and S3 bucket needed for medical scribe
integration tests once per test session. The ``healthscribe_resources``
fixture provides the role ARN and bucket name.
"""

import json
import uuid
from typing import Any

import boto3
Comment thread
Alan4506 marked this conversation as resolved.
import pytest

REGION = "us-east-1"

# Tags applied to all resources so orphaned resources from interrupted
# test runs can be discovered and cleaned up.
_TAGS = [{"Key": "Purpose", "Value": "IntegTest"}]


def _create_iam_role(iam_client: Any, role_name: str, bucket_name: str) -> None:
"""Create an IAM role with S3 PutObject access for Transcribe Streaming.

Args:
iam_client: A boto3 IAM client.
role_name: The name of the IAM role to create.
bucket_name: The name of the S3 bucket the role is allowed to write to.
"""
trust_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"Service": ["transcribe.streaming.amazonaws.com"]},
"Action": "sts:AssumeRole",
}
],
}

iam_client.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(trust_policy),
Tags=_TAGS,
)

permissions_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Action": ["s3:PutObject"],
"Resource": [
f"arn:aws:s3:::{bucket_name}",
f"arn:aws:s3:::{bucket_name}/*",
],
"Effect": "Allow",
}
],
}

iam_client.put_role_policy(
RoleName=role_name,
PolicyName="healthscribe-s3-access",
PolicyDocument=json.dumps(permissions_policy),
)


def _create_healthscribe_resources(
iam_client: Any, s3_client: Any, sts_client: Any, role_name: str, bucket_name: str
) -> str:
"""Create an IAM role and S3 bucket for medical scribe tests.

Args:
iam_client: A boto3 IAM client.
s3_client: A boto3 S3 client.
sts_client: A boto3 STS client.
role_name: The name of the IAM role to create.
bucket_name: The name of the S3 bucket to create.

Returns:
The IAM role ARN.
"""
account_id = sts_client.get_caller_identity()["Account"]

s3_client.create_bucket(Bucket=bucket_name)
s3_client.put_bucket_tagging(Bucket=bucket_name, Tagging={"TagSet": _TAGS})
_create_iam_role(iam_client, role_name, bucket_name)

return f"arn:aws:iam::{account_id}:role/{role_name}"


def _delete_healthscribe_resources(
iam_client: Any, s3_client: Any, role_name: str, bucket_name: str
) -> None:
"""Delete the IAM role and S3 bucket created for tests.

Args:
iam_client: A boto3 IAM client.
s3_client: A boto3 S3 client.
role_name: The name of the IAM role to delete.
bucket_name: The name of the S3 bucket to delete.
"""
# Empty and delete the bucket
try:
paginator = s3_client.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=bucket_name):
objects = page.get("Contents")
if not objects:
continue
s3_client.delete_objects(
Bucket=bucket_name,
Delete={"Objects": [{"Key": o["Key"]} for o in objects]},
)
s3_client.delete_bucket(Bucket=bucket_name)
except s3_client.exceptions.NoSuchBucket:
pass

# Delete inline policy then role
try:
iam_client.delete_role_policy(
RoleName=role_name, PolicyName="healthscribe-s3-access"
)
except iam_client.exceptions.NoSuchEntityException:
pass

try:
iam_client.delete_role(RoleName=role_name)
except iam_client.exceptions.NoSuchEntityException:
pass


@pytest.fixture(scope="session")
def healthscribe_resources():
"""Create HealthScribe resources for the test session and delete them after."""
# Shortened UUID to keep IAM role name under the 64-character limit.
unique_suffix = uuid.uuid4().hex[:16]
role_name = f"integ-test-transcribe-streaming-role-{unique_suffix}"
bucket_name = f"integ-test-transcribe-streaming-bucket-{unique_suffix}"

iam_client = boto3.client("iam")
s3_client = boto3.client("s3", region_name=REGION)
sts_client = boto3.client("sts")

try:
role_arn = _create_healthscribe_resources(
iam_client, s3_client, sts_client, role_name, bucket_name
)
yield role_arn, bucket_name
finally:
_delete_healthscribe_resources(iam_client, s3_client, role_name, bucket_name)
Original file line number Diff line number Diff line change
@@ -1,24 +1,14 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

"""Test non-streaming output type handling.

This test requires AWS resources (an IAM role and an S3 bucket).
To set them up locally, run:

uv run scripts/setup_resources.py

Then export the environment variables shown in the output.
"""
"""Test non-streaming output type handling using Medical Scribe."""

import asyncio
import os
import time
import uuid

import pytest

from aws_sdk_transcribe_streaming.models import (
BadRequestException,
ClinicalNoteGenerationSettings,
GetMedicalScribeStreamInput,
GetMedicalScribeStreamOutput,
Expand All @@ -42,14 +32,14 @@
CHANNEL_NUMS = 1
CHUNK_SIZE = 1024 * 8

# Maximum time to wait for IAM role propagation across services.
ROLE_PROPAGATION_TIMEOUT = 300
# Delay between retries while waiting for IAM role propagation.
ROLE_PROPAGATION_RETRY_DELAY = 5

async def test_get_medical_scribe_stream() -> None:
role_arn = os.environ.get("HEALTHSCRIBE_ROLE_ARN")
s3_bucket = os.environ.get("HEALTHSCRIBE_S3_BUCKET")

if not role_arn or not s3_bucket:
pytest.fail("HEALTHSCRIBE_ROLE_ARN or HEALTHSCRIBE_S3_BUCKET not set")

async def _run_medical_scribe_session(role_arn: str, s3_bucket: str) -> None:
"""Run a full Medical Scribe streaming session and verify its completion."""
transcribe_client = create_transcribe_client("us-east-1")
session_id = str(uuid.uuid4())

Expand Down Expand Up @@ -121,3 +111,29 @@ async def test_get_medical_scribe_stream() -> None:
assert details.language_code == "en-US"
assert details.media_encoding == "pcm"
assert details.media_sample_rate_hertz == SAMPLE_RATE


async def test_get_medical_scribe_stream(
healthscribe_resources: tuple[str, str],
) -> None:
"""Test non-streaming GetMedicalScribeStream operation.

IAM is eventually consistent, so Transcribe may not be able to assume the
newly created role immediately. Retry on BadRequestException until the
role has propagated, or until the timeout is reached.
"""
role_arn, s3_bucket = healthscribe_resources

last_error: BadRequestException | None = None
try:
async with asyncio.timeout(ROLE_PROPAGATION_TIMEOUT):
while True:
try:
await _run_medical_scribe_session(role_arn, s3_bucket)
return
except BadRequestException as e:
last_error = e
await asyncio.sleep(ROLE_PROPAGATION_RETRY_DELAY)
except TimeoutError:
assert last_error is not None
raise last_error
97 changes: 0 additions & 97 deletions clients/aws-sdk-transcribe-streaming/tests/setup_resources.py

This file was deleted.