Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api_server_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@
from cloud_pipelines_backend import database_ops
from cloud_pipelines_backend.instrumentation import api_tracing
from cloud_pipelines_backend.instrumentation import contextual_logging
from cloud_pipelines_backend.instrumentation import otel_tracing

app = fastapi.FastAPI(
title="Cloud Pipelines API",
version="0.0.1",
separate_input_output_schemas=False,
)

# Configure OpenTelemetry tracing
otel_tracing.setup_api_tracing(app)

# Add request context middleware for automatic request_id generation
app.add_middleware(api_tracing.RequestContextMiddleware)

Expand Down
84 changes: 84 additions & 0 deletions cloud_pipelines_backend/instrumentation/otel_tracing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
"""
OpenTelemetry tracing configuration for FastAPI applications.

This module sets up distributed tracing with OTLP exporter for sending traces
to an OpenTelemetry collector endpoint specified via OTEL_EXPORTER_OTLP_ENDPOINT.
"""

import logging
import os

from fastapi import FastAPI
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter as GRPCSpanExporter,
)
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.sdk.resources import Resource, SERVICE_NAME
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

logger = logging.getLogger(__name__)


def setup_api_tracing(app: FastAPI) -> None:
"""
Configure OpenTelemetry tracing for a FastAPI application.

Args:
app: The FastAPI application instance to instrument

Environment Variables:
OTEL_EXPORTER_OTLP_ENDPOINT: The endpoint URL for the OTLP collector
(e.g., "http://localhost:4317")
If not set, tracing will not be exported.
APP_ENV: Optional environment name to include in service name
(defaults to "development")
"""
# Get OTLP endpoint from environment variable
otlp_endpoint = os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT")

if not otlp_endpoint:
logger.warning(
"OTEL_EXPORTER_OTLP_ENDPOINT not configured. "
"Tracing will not be exported. Set the environment variable to enable trace export."
)
return

try:
# Build service name with environment suffix
app_env = os.environ.get("APP_ENV", "development")
service_name = f"tangle-{app_env}"

# Create a resource identifying this service
resource = Resource(attributes={SERVICE_NAME: service_name})

# Create the OTLP exporter
otlp_exporter = GRPCSpanExporter(
endpoint=otlp_endpoint,
)

# Create and configure the tracer provider
tracer_provider = TracerProvider(resource=resource)

# Add a batch span processor to export spans in batches
# This improves performance by reducing network overhead
span_processor = BatchSpanProcessor(otlp_exporter)
tracer_provider.add_span_processor(span_processor)

# Set the global tracer provider
trace.set_tracer_provider(tracer_provider)

# Instrument the FastAPI application
# This automatically creates spans for all incoming HTTP requests
FastAPIInstrumentor.instrument_app(app)

logger.info(
f"OpenTelemetry tracing configured successfully. "
f"Service: {service_name}, Endpoint: {otlp_endpoint}"
)

except Exception as e:
logger.error(f"Failed to configure OpenTelemetry tracing: {e}", exc_info=True)
# Don't raise the exception - we don't want tracing setup failures
# to prevent the application from starting
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from . import container_component_utils
from . import interfaces


_logger = logging.getLogger(__name__)

_MAX_INPUT_VALUE_SIZE = 10000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from . import container_component_utils
from . import interfaces


_logger = logging.getLogger(__name__)

_MAX_INPUT_VALUE_SIZE = 10000
Expand Down Expand Up @@ -371,6 +370,7 @@ def from_dict(
log_uri=log_uri,
)


def _parse_docker_time(date_string: str) -> datetime.datetime:
# Workaround for Python <3.11 failing to parse timestamps that include nanoseconds:
# datetime.datetime.fromisoformat("2025-10-07T04:48:35.585991509+00:00")
Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ dependencies = [
"cloud-pipelines>=0.23.2.4",
"fastapi[standard]>=0.115.12",
"kubernetes>=33.1.0",
"opentelemetry-api>=1.28.2",
"opentelemetry-exporter-otlp-proto-grpc>=1.28.2",
"opentelemetry-instrumentation-fastapi>=0.49b2",
"opentelemetry-sdk>=1.28.2",
"requests<2.32.0",
"sqlalchemy>=2.0.41",
]
Expand Down
4 changes: 4 additions & 0 deletions start_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ def run_orchestrator(
from cloud_pipelines_backend import database_ops
from cloud_pipelines_backend.instrumentation import api_tracing
from cloud_pipelines_backend.instrumentation import contextual_logging
from cloud_pipelines_backend.instrumentation import otel_tracing


@contextlib.asynccontextmanager
Expand All @@ -242,6 +243,9 @@ async def lifespan(app: fastapi.FastAPI):
lifespan=lifespan,
)

# Configure OpenTelemetry tracing
otel_tracing.setup_api_tracing(app)

# Add request context middleware for automatic request_id generation
app.add_middleware(api_tracing.RequestContextMiddleware)

Expand Down
Loading