diff --git a/infra/main.bicep b/infra/main.bicep index 9f4ec91e..1e732bbe 100644 --- a/infra/main.bicep +++ b/infra/main.bicep @@ -881,7 +881,12 @@ module avmContainerApp 'br/public:avm/res/app/container-app:0.22.1' = { environmentResourceId: avmContainerAppEnv.outputs.resourceId workloadProfileName: 'Consumption' enableTelemetry: enableTelemetry - registries: null + registries: [ + { + server: containerRegistryEndpoint + identity: avmContainerRegistryReader.outputs.resourceId + } + ] managedIdentities: { systemAssigned: true userAssignedResourceIds: [ @@ -950,7 +955,12 @@ module avmContainerApp_API 'br/public:avm/res/app/container-app:0.22.1' = { environmentResourceId: avmContainerAppEnv.outputs.resourceId workloadProfileName: 'Consumption' enableTelemetry: enableTelemetry - registries: null + registries: [ + { + server: containerRegistryEndpoint + identity: avmContainerRegistryReader.outputs.resourceId + } + ] tags: tags managedIdentities: { systemAssigned: true @@ -1082,7 +1092,12 @@ module avmContainerApp_Web 'br/public:avm/res/app/container-app:0.22.1' = { environmentResourceId: avmContainerAppEnv.outputs.resourceId workloadProfileName: 'Consumption' enableTelemetry: enableTelemetry - registries: null + registries: [ + { + server: containerRegistryEndpoint + identity: avmContainerRegistryReader.outputs.resourceId + } + ] tags: tags managedIdentities: { systemAssigned: true @@ -1165,7 +1180,12 @@ module avmContainerApp_Workflow 'br/public:avm/res/app/container-app:0.22.1' = { environmentResourceId: avmContainerAppEnv.outputs.resourceId workloadProfileName: 'Consumption' enableTelemetry: enableTelemetry - registries: null + registries: [ + { + server: containerRegistryEndpoint + identity: avmContainerRegistryReader.outputs.resourceId + } + ] tags: tags managedIdentities: { systemAssigned: true @@ -1543,7 +1563,12 @@ module avmContainerApp_update 'br/public:avm/res/app/container-app:0.22.1' = { enableTelemetry: enableTelemetry environmentResourceId: avmContainerAppEnv.outputs.resourceId workloadProfileName: 'Consumption' - registries: null + registries: [ + { + server: containerRegistryEndpoint + identity: avmContainerRegistryReader.outputs.resourceId + } + ] tags: tags managedIdentities: { systemAssigned: true @@ -1625,7 +1650,12 @@ module avmContainerApp_API_update 'br/public:avm/res/app/container-app:0.22.1' = enableTelemetry: enableTelemetry environmentResourceId: avmContainerAppEnv.outputs.resourceId workloadProfileName: 'Consumption' - registries: null + registries: [ + { + server: containerRegistryEndpoint + identity: avmContainerRegistryReader.outputs.resourceId + } + ] tags: tags managedIdentities: { systemAssigned: true @@ -1761,7 +1791,12 @@ module avmContainerApp_Workflow_update 'br/public:avm/res/app/container-app:0.22 enableTelemetry: enableTelemetry environmentResourceId: avmContainerAppEnv.outputs.resourceId workloadProfileName: 'Consumption' - registries: null + registries: [ + { + server: containerRegistryEndpoint + identity: avmContainerRegistryReader.outputs.resourceId + } + ] tags: tags managedIdentities: { systemAssigned: true diff --git a/infra/main.json b/infra/main.json index e8ba8e73..b1e3d0c2 100644 --- a/infra/main.json +++ b/infra/main.json @@ -6,7 +6,7 @@ "_generator": { "name": "bicep", "version": "0.42.1.51946", - "templateHash": "5885652317352749587" + "templateHash": "14320065740070986438" }, "name": "Content Processing Solution Accelerator", "description": "Bicep template to deploy the Content Processing Solution Accelerator with AVM compliance." @@ -243,8 +243,6 @@ "bastionHostName": "[format('bas-{0}', variables('solutionSuffix'))]", "jumpboxVmName": "[take(format('vm-{0}', variables('solutionSuffix')), 15)]", "dataCollectionRulesResourceName": "[format('dcr-{0}', variables('solutionSuffix'))]", - "logAnalyticsWorkspaceResourceName": "[format('log-{0}', variables('solutionSuffix'))]", - "dcrLogAnalyticsDestinationName": "[format('la-{0}-destination', variables('logAnalyticsWorkspaceResourceName'))]", "privateDnsZones": [ "privatelink.cognitiveservices.azure.com", "privatelink.openai.azure.com", @@ -14609,10 +14607,19 @@ { "name": "SecurityAuditEvents", "streams": [ - "Microsoft-Event" + "Microsoft-WindowsEvent" + ], + "eventLogName": "Security", + "eventTypes": [ + { + "eventType": "Audit Success" + }, + { + "eventType": "Audit Failure" + } ], "xPathQueries": [ - "Security!*[System[(band(Keywords,13510798882111488)) and (EventID != 4624)]]" + "Security!*[System[(EventID=4624 or EventID=4625)]]" ] } ] @@ -14621,7 +14628,7 @@ "logAnalytics": [ { "workspaceResourceId": "[reference('logAnalyticsWorkspace').outputs.resourceId.value]", - "name": "[variables('dcrLogAnalyticsDestinationName')]" + "name": "[format('la-{0}', variables('dataCollectionRulesResourceName'))]" } ] }, @@ -14631,20 +14638,10 @@ "Microsoft-Perf" ], "destinations": [ - "[variables('dcrLogAnalyticsDestinationName')]" + "[format('la-{0}', variables('dataCollectionRulesResourceName'))]" ], "transformKql": "source", "outputStream": "Microsoft-Perf" - }, - { - "streams": [ - "Microsoft-Event" - ], - "destinations": [ - "[variables('dcrLogAnalyticsDestinationName')]" - ], - "transformKql": "source", - "outputStream": "Microsoft-Event" } ] } @@ -19275,7 +19272,7 @@ "mode": "Incremental", "parameters": { "name": { - "value": "[variables('logAnalyticsWorkspaceResourceName')]" + "value": "[format('log-{0}', variables('solutionSuffix'))]" }, "location": { "value": "[parameters('location')]" @@ -28053,9 +28050,6 @@ "ipRules": [] } }, - "requireInfrastructureEncryption": { - "value": true - }, "supportsHttpsTrafficOnly": { "value": true }, @@ -36183,8 +36177,8 @@ "avmContainerApp_API", "avmContainerApp_Workflow", "avmManagedIdentity", - "[format('avmPrivateDnsZones[{0}]', variables('dnsZoneIndex').storageQueue)]", "[format('avmPrivateDnsZones[{0}]', variables('dnsZoneIndex').storageBlob)]", + "[format('avmPrivateDnsZones[{0}]', variables('dnsZoneIndex').storageQueue)]", "virtualNetwork" ] }, @@ -42488,9 +42482,9 @@ "dependsOn": [ "avmAiServices", "[format('avmPrivateDnsZones[{0}]', variables('dnsZoneIndex').contentUnderstanding)]", - "[format('avmPrivateDnsZones[{0}]', variables('dnsZoneIndex').openAI)]", - "[format('avmPrivateDnsZones[{0}]', variables('dnsZoneIndex').aiServices)]", "[format('avmPrivateDnsZones[{0}]', variables('dnsZoneIndex').cognitiveServices)]", + "[format('avmPrivateDnsZones[{0}]', variables('dnsZoneIndex').aiServices)]", + "[format('avmPrivateDnsZones[{0}]', variables('dnsZoneIndex').openAI)]", "virtualNetwork" ] }, @@ -44170,7 +44164,12 @@ "value": "[parameters('enableTelemetry')]" }, "registries": { - "value": null + "value": [ + { + "server": "[parameters('containerRegistryEndpoint')]", + "identity": "[reference('avmContainerRegistryReader').outputs.resourceId.value]" + } + ] }, "managedIdentities": { "value": { @@ -45801,7 +45800,12 @@ "value": "[parameters('enableTelemetry')]" }, "registries": { - "value": null + "value": [ + { + "server": "[parameters('containerRegistryEndpoint')]", + "identity": "[reference('avmContainerRegistryReader').outputs.resourceId.value]" + } + ] }, "tags": { "value": "[parameters('tags')]" @@ -45916,9 +45920,6 @@ "ingressTransport": { "value": "auto" }, - "ingressAllowInsecure": { - "value": false - }, "corsPolicy": { "value": { "allowedOrigins": [ @@ -47499,7 +47500,12 @@ "value": "[parameters('enableTelemetry')]" }, "registries": { - "value": null + "value": [ + { + "server": "[parameters('containerRegistryEndpoint')]", + "identity": "[reference('avmContainerRegistryReader').outputs.resourceId.value]" + } + ] }, "tags": { "value": "[parameters('tags')]" @@ -47524,9 +47530,6 @@ "ingressTransport": { "value": "auto" }, - "ingressAllowInsecure": { - "value": false - }, "scaleSettings": { "value": { "maxReplicas": "[if(parameters('enableScalability'), 3, 2)]", @@ -49150,7 +49153,12 @@ "value": "[parameters('enableTelemetry')]" }, "registries": { - "value": null + "value": [ + { + "server": "[parameters('containerRegistryEndpoint')]", + "identity": "[reference('avmContainerRegistryReader').outputs.resourceId.value]" + } + ] }, "tags": { "value": "[parameters('tags')]" @@ -50799,6 +50807,9 @@ "EnableMongo" ] }, + "enableAnalyticalStorage": { + "value": true + }, "defaultConsistencyLevel": { "value": "Session" }, @@ -61435,7 +61446,12 @@ "value": "Consumption" }, "registries": { - "value": null + "value": [ + { + "server": "[parameters('containerRegistryEndpoint')]", + "identity": "[reference('avmContainerRegistryReader').outputs.resourceId.value]" + } + ] }, "tags": { "value": "[parameters('tags')]" @@ -63069,7 +63085,12 @@ "value": "Consumption" }, "registries": { - "value": null + "value": [ + { + "server": "[parameters('containerRegistryEndpoint')]", + "identity": "[reference('avmContainerRegistryReader').outputs.resourceId.value]" + } + ] }, "tags": { "value": "[parameters('tags')]" @@ -63184,9 +63205,6 @@ "ingressTransport": { "value": "auto" }, - "ingressAllowInsecure": { - "value": false - }, "corsPolicy": { "value": { "allowedOrigins": [ @@ -64769,7 +64787,12 @@ "value": "Consumption" }, "registries": { - "value": null + "value": [ + { + "server": "[parameters('containerRegistryEndpoint')]", + "identity": "[reference('avmContainerRegistryReader').outputs.resourceId.value]" + } + ] }, "tags": { "value": "[parameters('tags')]" diff --git a/infra/main.parameters.json b/infra/main.parameters.json index 44153d57..3cd4bcaa 100644 --- a/infra/main.parameters.json +++ b/infra/main.parameters.json @@ -34,6 +34,9 @@ }, "imageTag": { "value": "${AZURE_ENV_IMAGETAG=latest_v2}" + }, + "enableMonitoring": { + "value": false } } } \ No newline at end of file diff --git a/src/ContentProcessor/pyproject.toml b/src/ContentProcessor/pyproject.toml index 310524ce..e513a380 100644 --- a/src/ContentProcessor/pyproject.toml +++ b/src/ContentProcessor/pyproject.toml @@ -9,6 +9,7 @@ dependencies = [ "azure-ai-inference==1.0.0b9", "azure-appconfiguration==1.8.0", "azure-identity==1.26.0b1", + "azure-monitor-events-extension==0.1.0", "azure-monitor-opentelemetry==1.8.7", "azure-storage-blob==12.29.0b1", "azure-storage-queue==12.16.0b1", diff --git a/src/ContentProcessor/src/libs/llm_token_telemetry.py b/src/ContentProcessor/src/libs/llm_token_telemetry.py new file mode 100644 index 00000000..ae8aa383 --- /dev/null +++ b/src/ContentProcessor/src/libs/llm_token_telemetry.py @@ -0,0 +1,1003 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +"""Cross-accelerator LLM token-usage telemetry helpers. + +A single, dependency-light helper module that can be dropped into any Microsoft +Solution Accelerator to capture LLM token usage and emit standardized custom +events to Application Insights. + +Why this file exists +-------------------- +Seven solution accelerators have independently shipped near-identical +``token_usage_utils.py`` modules. They all: + +* extract token counts from agent_framework / Azure OpenAI responses, +* emit the same three custom events (``LLM_Token_Usage_Summary``, + ``LLM_Agent_Token_Usage``, ``LLM_Model_Token_Usage``), +* defensively swallow telemetry errors, +* duplicate the same KQL queries and Azure Workbook. + +This module consolidates the union of those behaviours behind one stable API +so each accelerator can replace its bespoke helper with an import. + +Public API +---------- +- ``TokenUsage`` -- immutable dataclass for counts +- ``extract_usage(obj)`` -- agent_framework run result / message +- ``extract_usage_from_dict(d)`` -- raw dict from any SDK +- ``extract_usage_from_stream_chunk`` -- streaming chunks +- ``extract_realtime_usage(resp)`` -- Azure AI Voice Live response.done +- ``TokenUsageEmitter`` -- emits the three events + optional + per-user / per-team / speech events +- ``TokenUsageScope`` -- context-manager that accumulates and + auto-emits on exit +- ``track_tokens`` -- decorator wrapper around the scope + +Design rules +------------ +* Telemetry NEVER raises. Extraction failures return ``None``; emission + failures are logged at WARNING. +* No hard dependency on ``azure-monitor-events-extension``; if absent the + emitter degrades to logging only. +* Arbitrary correlation dimensions are passed as ``**dimensions`` kwargs and + surface verbatim as custom-event properties. This is how each accelerator + attaches its own keys (``conversation_id``, ``process_id``, ``team_name``, + ``file_name``, ``tenant``, etc.) without forking the helper. +""" +from __future__ import annotations + +import asyncio +import functools +import hashlib +import logging +import os +import random +import time +from contextlib import AbstractContextManager +from dataclasses import dataclass, field +from typing import Any, Callable, Iterable, Mapping, Optional +from unittest.mock import NonCallableMock + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Event-name constants -- keep these stable; KQL queries and workbooks bind +# to these exact strings. +# --------------------------------------------------------------------------- +EVENT_SUMMARY = "LLM_Token_Usage_Summary" +EVENT_AGENT = "LLM_Agent_Token_Usage" +EVENT_MODEL = "LLM_Model_Token_Usage" +EVENT_USER = "LLM_User_Token_Usage" +EVENT_TEAM = "LLM_Team_Token_Usage" +EVENT_SPEECH = "Speech_Usage" + + +# Token-count field aliases observed across model providers / SDK versions. +_INPUT_KEYS = ( + "input_token_count", + "input_tokens", + "prompt_tokens", + "promptTokens", +) +_OUTPUT_KEYS = ( + "output_token_count", + "output_tokens", + "completion_tokens", + "completionTokens", +) +_TOTAL_KEYS = ( + "total_token_count", + "total_tokens", + "totalTokens", +) + + +# --------------------------------------------------------------------------- +# Data model +# --------------------------------------------------------------------------- +@dataclass(frozen=True) +class TokenUsage: + """Normalized token-usage record. + + Attributes: + input_tokens: Number of input/prompt tokens consumed. + output_tokens: Number of output/completion tokens consumed. + total_tokens: Total token count (input + output). + input_audio_tokens: Audio input tokens (realtime/voice only). + input_text_tokens: Text input tokens (realtime/voice only). + input_cached_tokens: Cached input tokens (realtime/voice only). + output_audio_tokens: Audio output tokens (realtime/voice only). + output_text_tokens: Text output tokens (realtime/voice only). + """ + + input_tokens: int = 0 + output_tokens: int = 0 + total_tokens: int = 0 + + input_audio_tokens: Optional[int] = None + input_text_tokens: Optional[int] = None + input_cached_tokens: Optional[int] = None + output_audio_tokens: Optional[int] = None + output_text_tokens: Optional[int] = None + + @property + def has_any(self) -> bool: + """True if any token count is non-zero.""" + return bool(self.input_tokens or self.output_tokens or self.total_tokens) + + def __add__(self, other: "TokenUsage") -> "TokenUsage": + if not isinstance(other, TokenUsage): + return NotImplemented + + def _sum(a: Optional[int], b: Optional[int]) -> Optional[int]: + if a is None and b is None: + return None + return (a or 0) + (b or 0) + + return TokenUsage( + input_tokens=self.input_tokens + other.input_tokens, + output_tokens=self.output_tokens + other.output_tokens, + total_tokens=self.total_tokens + other.total_tokens, + input_audio_tokens=_sum(self.input_audio_tokens, other.input_audio_tokens), + input_text_tokens=_sum(self.input_text_tokens, other.input_text_tokens), + input_cached_tokens=_sum(self.input_cached_tokens, other.input_cached_tokens), + output_audio_tokens=_sum(self.output_audio_tokens, other.output_audio_tokens), + output_text_tokens=_sum(self.output_text_tokens, other.output_text_tokens), + ) + + def to_event_props(self) -> dict[str, str]: + """Stringified property bag suitable for App Insights custom events.""" + props: dict[str, str] = { + "input_tokens": str(self.input_tokens), + "output_tokens": str(self.output_tokens), + "total_tokens": str(self.total_tokens), + } + for name in ( + "input_audio_tokens", + "input_text_tokens", + "input_cached_tokens", + "output_audio_tokens", + "output_text_tokens", + ): + value = getattr(self, name) + if value is not None: + props[name] = str(value) + return props + + +# --------------------------------------------------------------------------- +# Low-level coercion helpers +# --------------------------------------------------------------------------- +def _to_int(value: Any, default: int = 0) -> int: + """Best-effort int conversion; bool excluded; never raises.""" + if value is None or isinstance(value, bool): + return default + if isinstance(value, int): + return value + if isinstance(value, float): + return int(value) + if isinstance(value, str): + s = value.strip() + if s.isdigit(): + return int(s) + try: + return int(value) + except (TypeError, ValueError): + return default + + +def _get(obj: Any, key: str, default: Any = None) -> Any: + """Read an attribute or dict key uniformly.""" + if obj is None: + return default + if isinstance(obj, Mapping): + return obj.get(key, default) + return getattr(obj, key, default) + + +def _is_iterable(obj: Any) -> bool: + """True only for real iterables (lists/tuples/sets/generators), NOT for + arbitrary objects that happen to expose ``__iter__``.""" + if obj is None: + return False + if isinstance(obj, (list, tuple, set, frozenset)): + return True + if isinstance(obj, (str, bytes, bytearray, Mapping)): + return False + if isinstance(obj, NonCallableMock): + return False + return hasattr(obj, "__iter__") + + +def _read_counts(usage_obj: Any) -> Optional[TokenUsage]: + """Read ``input/output/total`` from any usage-bearing object/dict.""" + if usage_obj is None: + return None + + inp = out = tot = 0 + for k in _INPUT_KEYS: + v = _get(usage_obj, k) + if v: + inp = _to_int(v) + break + for k in _OUTPUT_KEYS: + v = _get(usage_obj, k) + if v: + out = _to_int(v) + break + for k in _TOTAL_KEYS: + v = _get(usage_obj, k) + if v: + tot = _to_int(v) + break + + if tot == 0 and (inp or out): + tot = inp + out + if not (inp or out or tot): + return None + return TokenUsage(input_tokens=inp, output_tokens=out, total_tokens=tot) + + +# --------------------------------------------------------------------------- +# Extraction -- public +# --------------------------------------------------------------------------- +def extract_usage(result: Any) -> Optional[TokenUsage]: + """Extract usage from an agent_framework run result or ChatCompletion. + + Checks (in order): + 1. ``result.usage_details`` or ``result.usage`` + 2. ``result.raw_representation.usage`` (OpenAI ChatCompletion shape) + 3. Aggregated ``result.messages[*].contents[*].usage_details`` + + Never raises -- returns ``None`` on any unexpected shape. + """ + if result is None: + return None + + try: + for attr in ("usage_details", "usage"): + found = _read_counts(_get(result, attr)) + if found: + return found + + raw = _get(result, "raw_representation") + if raw is not None: + found = _read_counts(_get(raw, "usage")) + if found: + return found + + aggregated = TokenUsage() + found_any = False + messages = _get(result, "messages") + if not _is_iterable(messages): + return None + for msg in messages: + contents = _get(msg, "contents") + if not _is_iterable(contents): + continue + for content in contents: + usage = _get(content, "usage_details") or _get(content, "usage") + piece = _read_counts(usage) + if piece: + aggregated = aggregated + piece + found_any = True + return aggregated if found_any else None + except Exception as exc: + logger.debug("extract_usage failed: %s", exc, exc_info=True) + return None + + +def extract_usage_from_dict(data: Any) -> Optional[TokenUsage]: + """Extract from a raw dict / SDK usage object.""" + return _read_counts(data) + + +def extract_usage_from_stream_chunk(chunk: Any) -> Optional[TokenUsage]: + """Streaming chunks: try the top-level shape, then ``chunk.metadata.usage``.""" + found = extract_usage(chunk) + if found: + return found + metadata = _get(chunk, "metadata") + if metadata is not None: + return _read_counts(_get(metadata, "usage")) + return None + + +def extract_realtime_usage(response_obj: Any) -> Optional[TokenUsage]: + """Azure AI Voice Live ``response.done`` payload extractor. + + Includes audio / text / cached sub-counts when present. + """ + usage = _get(response_obj, "usage") + if usage is None: + return None + + inp = _to_int(_get(usage, "input_tokens")) + out = _to_int(_get(usage, "output_tokens")) + tot = _to_int(_get(usage, "total_tokens")) + if tot == 0 and (inp or out): + tot = inp + out + + in_details = _get(usage, "input_token_details") or {} + out_details = _get(usage, "output_token_details") or {} + + record = TokenUsage( + input_tokens=inp, + output_tokens=out, + total_tokens=tot, + input_audio_tokens=_to_int(_get(in_details, "audio_tokens")) if _get(in_details, "audio_tokens") is not None else None, + input_text_tokens=_to_int(_get(in_details, "text_tokens")) if _get(in_details, "text_tokens") is not None else None, + input_cached_tokens=_to_int(_get(in_details, "cached_tokens")) if _get(in_details, "cached_tokens") is not None else None, + output_audio_tokens=_to_int(_get(out_details, "audio_tokens")) if _get(out_details, "audio_tokens") is not None else None, + output_text_tokens=_to_int(_get(out_details, "text_tokens")) if _get(out_details, "text_tokens") is not None else None, + ) + if record.has_any or any( + v for v in ( + record.input_audio_tokens, + record.input_text_tokens, + record.input_cached_tokens, + record.output_audio_tokens, + record.output_text_tokens, + ) + ): + return record + return None + + +# --------------------------------------------------------------------------- +# Tool / sub-agent attribution +# --------------------------------------------------------------------------- +def detect_invoked_tools(result: Any) -> set[str]: + """Return the set of tool/function names invoked in an agent result. + + Used by orchestrators that expose sub-agents via ``.as_tool()`` to attribute + token usage only to the sub-agents that were actually called. Never raises. + """ + invoked: set[str] = set() + try: + messages = _get(result, "messages") + if not _is_iterable(messages): + return invoked + for msg in messages: + contents = _get(msg, "contents") + if not _is_iterable(contents): + continue + for content in contents: + if _get(content, "type") == "function_call": + name = _get(content, "name") + if name: + invoked.add(str(name)) + except Exception as exc: + logger.debug("detect_invoked_tools failed: %s", exc, exc_info=True) + return invoked + + +# --------------------------------------------------------------------------- +# Event sink (optional Application Insights dependency) +# --------------------------------------------------------------------------- +EventSink = Callable[[str, Mapping[str, str]], None] + + +def _default_event_sink() -> Optional[EventSink]: + """Return ``azure.monitor.events.extension.track_event`` if importable, + else ``None``.""" + try: + from azure.monitor.events.extension import track_event # type: ignore + except Exception: # pragma: no cover - optional dep + return None + return track_event + + +# --------------------------------------------------------------------------- +# Emitter +# --------------------------------------------------------------------------- +class TokenUsageEmitter: + """Emit standardized token-usage custom events to Application Insights. + + Responsibilities: + 1. Emit LLM_Agent_Token_Usage, LLM_Model_Token_Usage, and + LLM_Token_Usage_Summary events with consistent property schemas. + 2. Optionally sample high-cardinality events while always emitting + the summary event for accurate per-request totals. + 3. Support per-model pricing for estimated cost calculation. + 4. Hash user_id values for PII/GDPR compliance when configured. + + Attributes: + perf_slow_emit_threshold_ms: Soft threshold (ms) above which a + WARNING is logged for an individual emit call. + """ + + def __init__( + self, + *, + connection_string: Optional[str] = None, + static_dimensions: Optional[Mapping[str, Any]] = None, + event_sink: Optional[EventSink] = None, + pricing: Optional[Mapping[str, tuple[float, float]]] = None, + user_id_hasher: Optional[Callable[[str], str]] = None, + sample_rate: float = 1.0, + logger: Optional[logging.Logger] = None, + ) -> None: + self._cs = connection_string if connection_string is not None else os.getenv( + "APPLICATIONINSIGHTS_CONNECTION_STRING" + ) + self._sink = event_sink if event_sink is not None else _default_event_sink() + self._log = logger or logging.getLogger(__name__) + + self._user_id_hasher = user_id_hasher + + try: + sr = float(sample_rate) + except (TypeError, ValueError): + sr = 1.0 + self._sample_rate = max(0.0, min(1.0, sr)) + + self._pricing: dict[str, tuple[float, float]] = {} + for model, rates in (pricing or {}).items(): + if not model or rates is None: + continue + try: + inp, out = rates + self._pricing[str(model).lower()] = (float(inp), float(out)) + except (TypeError, ValueError): + self._log.warning("Ignoring malformed pricing entry: %s=%r", model, rates) + + raw_static = dict(static_dimensions or {}) + if "user_id" in raw_static: + raw_static["user_id"] = self._apply_user_id_hash(raw_static["user_id"]) + self._static: dict[str, str] = { + k: ("" if v is None else str(v)) for k, v in raw_static.items() + } + + self._perf_total_ns: int = 0 + self._perf_emit_count: int = 0 + self._perf_max_ns: int = 0 + self.perf_slow_emit_threshold_ms: float = 50.0 + + @property + def enabled(self) -> bool: + """True when App Insights connection string and event sink are available.""" + return bool(self._cs) and self._sink is not None + + @property + def sample_rate(self) -> float: + """Current sampling rate for high-cardinality events.""" + return self._sample_rate + + def _apply_user_id_hash(self, value: Any) -> Any: + """Apply the configured user_id_hasher; never raises.""" + if value is None or value == "" or self._user_id_hasher is None: + return value + try: + return self._user_id_hasher(str(value)) + except Exception as exc: + self._log.warning("user_id_hasher raised: %s", exc) + return value + + def _should_sample(self) -> bool: + """Sampling decision for high-cardinality events.""" + if self._sample_rate >= 1.0: + return True + if self._sample_rate <= 0.0: + return False + return random.random() < self._sample_rate + + def _cost_props( + self, model_deployment_name: Optional[str], usage: TokenUsage + ) -> dict[str, str]: + """Return ``{'estimated_cost_usd': '...'}`` when pricing is configured.""" + if not self._pricing or not model_deployment_name: + return {} + rate = self._pricing.get(model_deployment_name.lower()) + if not rate: + return {} + inp_rate, out_rate = rate + cost = (usage.input_tokens * inp_rate + usage.output_tokens * out_rate) / 1000.0 + return {"estimated_cost_usd": f"{cost:.6f}"} + + def _summary_cost_props( + self, + primary_model: Optional[str], + additional_agents: Mapping[str, str], + usage: TokenUsage, + ) -> dict[str, str]: + """Best-effort cost for the summary event.""" + if primary_model: + cost = self._cost_props(primary_model, usage) + if cost: + return cost + for m in additional_agents.values(): + cost = self._cost_props(m, usage) + if cost: + return cost + return {} + + def emit(self, event_name: str, **dimensions: Any) -> None: + """Low-level: emit an event with arbitrary properties. Never raises.""" + start_ns = time.perf_counter_ns() + try: + props = dict(self._static) + for k, v in dimensions.items(): + if v is None: + continue + if k == "user_id": + v = self._apply_user_id_hash(v) + if v is None or v == "": + continue + props[k] = v if isinstance(v, str) else str(v) + + # Deterministic event_id for deduplication across services. + # Key fields: event_name + process_id + agent_name + model + dedup_parts = [ + event_name, + props.get("process_id", ""), + props.get("agent_name", ""), + props.get("model_deployment_name", ""), + ] + props["event_id"] = hashlib.sha256( + "|".join(dedup_parts).encode() + ).hexdigest()[:16] + + if not self.enabled: + self._log.debug( + "App Insights not configured -- skipping event %s (%s)", + event_name, props, + ) + return + try: + self._sink(event_name, props) # type: ignore[misc] + except Exception as exc: + self._log.warning("track_event(%s) failed: %s", event_name, exc) + finally: + elapsed_ns = time.perf_counter_ns() - start_ns + self._perf_total_ns += elapsed_ns + self._perf_emit_count += 1 + if elapsed_ns > self._perf_max_ns: + self._perf_max_ns = elapsed_ns + elapsed_ms = elapsed_ns / 1_000_000.0 + if elapsed_ms > self.perf_slow_emit_threshold_ms: + self._log.warning( + "Token telemetry emit slow: event=%s duration_ms=%.3f", + event_name, elapsed_ms, + ) + else: + self._log.debug( + "Token telemetry emit: event=%s duration_ms=%.3f", + event_name, elapsed_ms, + ) + + def perf_stats(self) -> dict[str, float]: + """Return cumulative telemetry-overhead stats. + + Returns: + Dict with keys: emit_count, total_ms, avg_ms, max_ms. + """ + count = self._perf_emit_count + total_ms = self._perf_total_ns / 1_000_000.0 + return { + "emit_count": float(count), + "total_ms": total_ms, + "avg_ms": (total_ms / count) if count else 0.0, + "max_ms": self._perf_max_ns / 1_000_000.0, + } + + def reset_perf_stats(self) -> None: + """Zero the perf counters.""" + self._perf_total_ns = 0 + self._perf_emit_count = 0 + self._perf_max_ns = 0 + + def emit_agent( + self, + *, + agent_name: str, + model_deployment_name: str, + usage: TokenUsage, + **dimensions: Any, + ) -> None: + """Emit a per-agent token usage event.""" + if not usage.has_any or not self._should_sample(): + return + self.emit( + EVENT_AGENT, + agent_name=agent_name, + model_deployment_name=model_deployment_name, + **usage.to_event_props(), + **self._cost_props(model_deployment_name, usage), + **dimensions, + ) + + def emit_model( + self, + *, + model_deployment_name: str, + usage: TokenUsage, + **dimensions: Any, + ) -> None: + """Emit a per-model token usage event.""" + if not usage.has_any or not self._should_sample(): + return + self.emit( + EVENT_MODEL, + model_deployment_name=model_deployment_name, + **usage.to_event_props(), + **self._cost_props(model_deployment_name, usage), + **dimensions, + ) + + def emit_user( + self, + *, + user_id: str, + usage: TokenUsage, + **dimensions: Any, + ) -> None: + """Emit a per-user token usage event.""" + if not usage.has_any or not user_id or not self._should_sample(): + return + self.emit( + EVENT_USER, + user_id=user_id, + **usage.to_event_props(), + **dimensions, + ) + + def emit_team( + self, + *, + team_name: str, + usage: TokenUsage, + **dimensions: Any, + ) -> None: + """Emit a per-team token usage event.""" + if not usage.has_any or not team_name or not self._should_sample(): + return + self.emit( + EVENT_TEAM, + team_name=team_name, + **usage.to_event_props(), + **dimensions, + ) + + def emit_summary( + self, + *, + usage: TokenUsage, + agent_count: int = 1, + model_count: int = 1, + primary_model: Optional[str] = None, + additional_agents: Optional[Mapping[str, str]] = None, + **dimensions: Any, + ) -> None: + """Emit the summary event (always fires, ignores sample_rate).""" + if not usage.has_any: + return + props = { + "total_input_tokens": str(usage.input_tokens), + "total_output_tokens": str(usage.output_tokens), + "total_tokens": str(usage.total_tokens), + "agent_count": str(agent_count), + "model_count": str(model_count), + "sample_rate": f"{self._sample_rate:.4f}", + } + for k, v in usage.to_event_props().items(): + props.setdefault(k, v) + props.update(self._summary_cost_props(primary_model, additional_agents or {}, usage)) + self.emit(EVENT_SUMMARY, **props, **dimensions) + + def emit_speech( + self, + *, + model_deployment_name: str, + source: str, + usage: TokenUsage, + **dimensions: Any, + ) -> None: + """Voice-Live / realtime speech usage event.""" + if not self._should_sample(): + return + self.emit( + EVENT_SPEECH, + model_deployment_name=model_deployment_name, + source=source, + **usage.to_event_props(), + **self._cost_props(model_deployment_name, usage), + **dimensions, + ) + + def emit_all( + self, + *, + agent_name: str, + model_deployment_name: str, + usage: TokenUsage, + additional_agents: Optional[Mapping[str, str]] = None, + emit_user_event: bool = False, + emit_team_event: bool = False, + **dimensions: Any, + ) -> None: + """Emit summary, agent, and one model event per distinct model deployment. + + Args: + agent_name: Name of the primary agent/step. + model_deployment_name: Model deployment used by the primary agent. + usage: Accumulated token usage for this invocation. + additional_agents: Maps sub-agent name -> model deployment name. + emit_user_event: Opt in to per-user events. + emit_team_event: Opt in to per-team events. + **dimensions: Extra properties forwarded to all events. + """ + if not usage.has_any: + return + + agents = {agent_name: model_deployment_name} + if additional_agents: + agents.update({k: v for k, v in additional_agents.items() if k}) + models = {m for m in agents.values() if m} + + batch_start_ns = time.perf_counter_ns() + + self.emit_agent( + agent_name=agent_name, + model_deployment_name=model_deployment_name, + usage=usage, + **dimensions, + ) + for model in models: + self.emit_model( + model_deployment_name=model, + usage=usage, + **dimensions, + ) + if emit_user_event and dimensions.get("user_id"): + self.emit_user( + user_id=str(dimensions["user_id"]), + usage=usage, + agent_name=agent_name, + model_deployment_name=model_deployment_name, + ) + if emit_team_event and dimensions.get("team_name"): + self.emit_team( + team_name=str(dimensions["team_name"]), + usage=usage, + agent_name=agent_name, + model_deployment_name=model_deployment_name, + ) + + batch_overhead_ms = (time.perf_counter_ns() - batch_start_ns) / 1_000_000.0 + self.emit_summary( + usage=usage, + agent_count=len(agents), + model_count=len(models) or 1, + primary_model=model_deployment_name, + additional_agents=additional_agents, + telemetry_overhead_ms=f"{batch_overhead_ms:.3f}", + **dimensions, + ) + + self._log.debug( + "[TOKEN USAGE] agent=%s model=%s input=%d output=%d total=%d %s", + agent_name, + model_deployment_name, + usage.input_tokens, + usage.output_tokens, + usage.total_tokens, + " ".join(f"{k}={v}" for k, v in dimensions.items() if v), + ) + + +# --------------------------------------------------------------------------- +# Scope / decorator sugar +# --------------------------------------------------------------------------- +@dataclass +class TokenUsageScope(AbstractContextManager): + """Accumulate usage across multiple results, then emit on exit. + + Example:: + + with TokenUsageScope(emitter, + agent_name="MapHandler", + model_deployment_name=cfg.model, + process_id=pid) as scope: + result = await agent.run(prompt) + scope.add(result) + + Attributes: + emitter: The TokenUsageEmitter instance to use for emission. + agent_name: Name of the agent/step being tracked. + model_deployment_name: Model deployment name for attribution. + dimensions: Extra properties forwarded to all events. + additional_agents: Maps sub-agent name -> model deployment name. + emit_user_event: Whether to emit per-user events. + emit_team_event: Whether to emit per-team events. + usage: Accumulated TokenUsage so far. + """ + + emitter: TokenUsageEmitter + agent_name: str + model_deployment_name: str + dimensions: dict[str, Any] = field(default_factory=dict) + additional_agents: dict[str, str] = field(default_factory=dict) + emit_user_event: bool = False + emit_team_event: bool = False + usage: TokenUsage = field(default_factory=TokenUsage) + + def __init__( + self, + emitter: TokenUsageEmitter, + *, + agent_name: str, + model_deployment_name: str, + additional_agents: Optional[Mapping[str, str]] = None, + emit_user_event: bool = False, + emit_team_event: bool = False, + **dimensions: Any, + ) -> None: + self.emitter = emitter + self.agent_name = agent_name + self.model_deployment_name = model_deployment_name + self.additional_agents = dict(additional_agents or {}) + self.emit_user_event = emit_user_event + self.emit_team_event = emit_team_event + self.dimensions = dict(dimensions) + self.usage = TokenUsage() + self._extract_ns: int = 0 + self._emit_ns: int = 0 + + def add(self, source: Any) -> Optional[TokenUsage]: + """Extract usage from any supported shape and add to the running total. + + Args: + source: Agent run result, ChatMessage, or ChatCompletion object. + + Returns: + The extracted TokenUsage, or None if extraction failed. + """ + start_ns = time.perf_counter_ns() + try: + found = extract_usage(source) + except Exception as exc: + logger.debug("TokenUsageScope.add failed: %s", exc, exc_info=True) + return None + finally: + self._extract_ns += time.perf_counter_ns() - start_ns + if found: + self.usage = self.usage + found + return found + + def add_usage(self, usage: TokenUsage) -> None: + """Add a pre-constructed TokenUsage to the running total.""" + self.usage = self.usage + usage + + def add_chunks(self, chunks: Iterable[Any]) -> None: + """Extract and accumulate usage from a stream of chunks.""" + for c in chunks: + self.add(c) + + @property + def extract_ms(self) -> float: + """Total ms spent inside :meth:`add` / :meth:`add_chunks`.""" + return self._extract_ns / 1_000_000.0 + + @property + def emit_ms(self) -> float: + """Total ms spent in the on-exit emit batch.""" + return self._emit_ns / 1_000_000.0 + + @property + def total_overhead_ms(self) -> float: + """Total telemetry overhead added by this scope (extract + emit).""" + return self.extract_ms + self.emit_ms + + def __exit__(self, exc_type, exc, tb) -> None: + emit_start_ns = time.perf_counter_ns() + try: + self.emitter.emit_all( + agent_name=self.agent_name, + model_deployment_name=self.model_deployment_name, + usage=self.usage, + additional_agents=self.additional_agents, + emit_user_event=self.emit_user_event, + emit_team_event=self.emit_team_event, + **self.dimensions, + ) + except Exception as emit_exc: # pragma: no cover + logger.warning("TokenUsageScope emit failed: %s", emit_exc) + finally: + self._emit_ns += time.perf_counter_ns() - emit_start_ns + logger.debug( + "TokenUsageScope overhead: agent=%s extract_ms=%.3f " + "emit_ms=%.3f total_ms=%.3f", + self.agent_name, + self.extract_ms, + self.emit_ms, + self.total_overhead_ms, + ) + return None + + +def track_tokens( + emitter: TokenUsageEmitter, + *, + agent_name: str, + model_deployment_name: str, + dimension_args: Optional[Mapping[str, str]] = None, + additional_agents: Optional[Mapping[str, str]] = None, + emit_user_event: bool = False, + emit_team_event: bool = False, +): + """Decorator: wrap an async or sync function that returns an LLM result. + + Args: + emitter: TokenUsageEmitter to use. + agent_name: Name of the agent/step. + model_deployment_name: Model deployment name. + dimension_args: Maps emitted-property-name -> callable-keyword-argument. + additional_agents: Sub-agent name -> model deployment name mapping. + emit_user_event: Opt in to per-user events. + emit_team_event: Opt in to per-team events. + """ + + dim_args = dict(dimension_args or {}) + + def _decorator(fn: Callable[..., Any]): + is_coro = _is_coroutine_function(fn) + + if is_coro: + @functools.wraps(fn) + async def _aw(*args, **kwargs) -> Any: + with _scope_for(kwargs) as scope: + result = await fn(*args, **kwargs) + scope.add(result) + return result + return _aw + + @functools.wraps(fn) + def _sw(*args, **kwargs) -> Any: + with _scope_for(kwargs) as scope: + result = fn(*args, **kwargs) + scope.add(result) + return result + return _sw + + def _scope_for(call_kwargs: Mapping[str, Any]) -> TokenUsageScope: + dimensions = { + prop: call_kwargs.get(kw) + for prop, kw in dim_args.items() + if call_kwargs.get(kw) is not None + } + return TokenUsageScope( + emitter, + agent_name=agent_name, + model_deployment_name=model_deployment_name, + additional_agents=additional_agents, + emit_user_event=emit_user_event, + emit_team_event=emit_team_event, + **dimensions, + ) + + return _decorator + + +def _is_coroutine_function(fn: Callable[..., Any]) -> bool: + return asyncio.iscoroutinefunction(fn) + + +__all__ = [ + "EVENT_SUMMARY", + "EVENT_AGENT", + "EVENT_MODEL", + "EVENT_USER", + "EVENT_TEAM", + "EVENT_SPEECH", + "TokenUsage", + "TokenUsageEmitter", + "TokenUsageScope", + "track_tokens", + "extract_usage", + "extract_usage_from_dict", + "extract_usage_from_stream_chunk", + "extract_realtime_usage", + "detect_invoked_tools", +] diff --git a/src/ContentProcessor/src/libs/pipeline/handlers/map_handler.py b/src/ContentProcessor/src/libs/pipeline/handlers/map_handler.py index f3f20cb3..2bcfb5ad 100644 --- a/src/ContentProcessor/src/libs/pipeline/handlers/map_handler.py +++ b/src/ContentProcessor/src/libs/pipeline/handlers/map_handler.py @@ -28,6 +28,8 @@ from libs.pipeline.entities.pipeline_step_result import StepResult from libs.pipeline.entities.schema import Schema from libs.pipeline.queue_handler_base import HandlerBase +from libs.llm_token_telemetry import TokenUsageScope +from libs.telemetry import token_emitter from libs.utils.remote_schema_loader import load_schema_from_blob_json logger = logging.getLogger(__name__) @@ -263,6 +265,18 @@ async def execute(self, context: MessageContext) -> StepResult: options={"logprobs": True, "top_logprobs": 5}, ) + # Track token usage for this LLM call + source_file = context.data_pipeline.get_source_files()[0] + with TokenUsageScope( + token_emitter, + agent_name="MapHandler", + model_deployment_name=self.application_context.configuration.app_azure_openai_model, + process_id=context.data_pipeline.pipeline_status.process_id, + file_name=source_file.name, + file_mime_type=source_file.mime_type or "", + ) as scope: + scope.add(gpt_response) + response_content = gpt_response.text # Json format string cleaned_content = ( diff --git a/src/ContentProcessor/src/libs/telemetry.py b/src/ContentProcessor/src/libs/telemetry.py new file mode 100644 index 00000000..0e432ed1 --- /dev/null +++ b/src/ContentProcessor/src/libs/telemetry.py @@ -0,0 +1,92 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +"""Process-wide telemetry singletons for the content-processing pipeline. + +A single :class:`TokenUsageEmitter` is constructed at import time so every +handler/executor shares the same App Insights connection-string resolution and +static dimensions. Importing this module has no side effects beyond reading +``APPLICATIONINSIGHTS_CONNECTION_STRING`` and the env vars documented below. + +Optional environment variables +------------------------------ +LLM_TOKEN_SAMPLE_RATE + Float in [0, 1]. Fraction of high-cardinality token events + (agent/model/user/team/speech) to ship. The summary event always fires. + Defaults to ``1.0``. + +LLM_TOKEN_USER_ID_HMAC_KEY + When set, ``user_id`` values are replaced with an HMAC-SHA256 hex digest + (truncated to 16 chars) before leaving the process. Use to satisfy + GDPR / PII handling requirements without modifying call sites. + +LLM_TOKEN_PRICING + Optional comma-separated list of ``model=in_per_1k:out_per_1k`` entries, + e.g. ``gpt-4o=0.0025:0.01,gpt-4o-mini=0.00015:0.0006``. When set the + emitter attaches ``estimated_cost_usd`` to agent / model / summary + events so dashboards can group by cost without hard-coded KQL rates. +""" +from __future__ import annotations + +import hashlib +import hmac +import logging +import os +from typing import Callable, Optional + +from libs.llm_token_telemetry import TokenUsageEmitter + +_log = logging.getLogger(__name__) + + +def _parse_sample_rate() -> float: + raw = os.getenv("LLM_TOKEN_SAMPLE_RATE") + if not raw: + return 1.0 + try: + return max(0.0, min(1.0, float(raw))) + except ValueError: + _log.warning("Invalid LLM_TOKEN_SAMPLE_RATE=%r; defaulting to 1.0", raw) + return 1.0 + + +def _build_user_id_hasher() -> Optional[Callable[[str], str]]: + key = os.getenv("LLM_TOKEN_USER_ID_HMAC_KEY") + if not key: + return None + key_bytes = key.encode("utf-8") + + def _hash(value: str) -> str: + digest = hmac.new(key_bytes, value.encode("utf-8"), hashlib.sha256).hexdigest() + return digest[:16] + + return _hash + + +def _parse_pricing() -> dict[str, tuple[float, float]]: + raw = os.getenv("LLM_TOKEN_PRICING") + if not raw: + return {} + pricing: dict[str, tuple[float, float]] = {} + for entry in raw.split(","): + entry = entry.strip() + if not entry or "=" not in entry: + continue + model, rates = entry.split("=", 1) + if ":" not in rates: + continue + in_s, out_s = rates.split(":", 1) + try: + pricing[model.strip().lower()] = (float(in_s), float(out_s)) + except ValueError: + _log.warning("Ignoring malformed pricing entry: %s", entry) + return pricing + + +token_emitter = TokenUsageEmitter( + static_dimensions={"app": "content-processing"}, + sample_rate=_parse_sample_rate(), + user_id_hasher=_build_user_id_hasher(), + pricing=_parse_pricing(), +) + +__all__ = ["token_emitter"] diff --git a/src/ContentProcessor/tests/unit/libs/test_token_usage_utils.py b/src/ContentProcessor/tests/unit/libs/test_token_usage_utils.py new file mode 100644 index 00000000..f68cc76e --- /dev/null +++ b/src/ContentProcessor/tests/unit/libs/test_token_usage_utils.py @@ -0,0 +1,371 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Tests for libs.llm_token_telemetry (standardized token usage telemetry).""" + +from __future__ import annotations + +from unittest.mock import MagicMock + +from libs.llm_token_telemetry import ( + TokenUsage, + TokenUsageEmitter, + TokenUsageScope, + _to_int, + extract_usage, + extract_usage_from_dict, + detect_invoked_tools, +) + + +# ── _to_int helper ───────────────────────────────────────────────────── + + +class TestToInt: + """Conversion helper for safely casting token counts.""" + + def test_none_returns_default(self): + assert _to_int(None) == 0 + + def test_bool_returns_default(self): + assert _to_int(True) == 0 + assert _to_int(False) == 0 + + def test_int_passthrough(self): + assert _to_int(42) == 42 + + def test_float_truncates(self): + assert _to_int(3.7) == 3 + + def test_digit_string(self): + assert _to_int("100") == 100 + + def test_non_digit_string_returns_default(self): + assert _to_int("abc") == 0 + + def test_custom_default(self): + assert _to_int(None, default=5) == 5 + + +# ── TokenUsage dataclass ────────────────────────────────────────────── + + +class TestTokenUsage: + """Immutable token-usage record with addition support.""" + + def test_defaults_to_zero(self): + usage = TokenUsage() + assert usage.input_tokens == 0 + assert usage.output_tokens == 0 + assert usage.total_tokens == 0 + assert not usage.has_any + + def test_has_any_true_when_nonzero(self): + assert TokenUsage(input_tokens=1).has_any + assert TokenUsage(output_tokens=1).has_any + assert TokenUsage(total_tokens=1).has_any + + def test_addition(self): + a = TokenUsage(input_tokens=100, output_tokens=50, total_tokens=150) + b = TokenUsage(input_tokens=200, output_tokens=80, total_tokens=280) + result = a + b + assert result.input_tokens == 300 + assert result.output_tokens == 130 + assert result.total_tokens == 430 + + def test_to_event_props(self): + usage = TokenUsage(input_tokens=10, output_tokens=5, total_tokens=15) + props = usage.to_event_props() + assert props == { + "input_tokens": "10", + "output_tokens": "5", + "total_tokens": "15", + } + + +# ── extract_usage ────────────────────────────────────────────────────── + + +class TestExtractUsage: + """Token extraction from various response shapes.""" + + def test_usage_details_dict_with_standard_keys(self): + response = MagicMock() + response.usage_details = { + "input_token_count": 100, + "output_token_count": 50, + "total_token_count": 150, + } + result = extract_usage(response) + assert result == TokenUsage(input_tokens=100, output_tokens=50, total_tokens=150) + + def test_usage_details_dict_with_openai_keys(self): + response = MagicMock() + response.usage_details = { + "prompt_tokens": 200, + "completion_tokens": 80, + "total_tokens": 280, + } + result = extract_usage(response) + assert result == TokenUsage(input_tokens=200, output_tokens=80, total_tokens=280) + + def test_usage_details_none_falls_to_raw_representation(self): + response = MagicMock() + response.usage_details = None + response.usage = None + usage_obj = MagicMock() + usage_obj.prompt_tokens = 300 + usage_obj.completion_tokens = 120 + usage_obj.total_tokens = 420 + usage_obj.input_tokens = 0 + usage_obj.output_tokens = 0 + usage_obj.input_token_count = 0 + usage_obj.output_token_count = 0 + usage_obj.total_token_count = 0 + usage_obj.promptTokens = 0 + usage_obj.completionTokens = 0 + usage_obj.totalTokens = 0 + response.raw_representation.usage = usage_obj + result = extract_usage(response) + assert result == TokenUsage(input_tokens=300, output_tokens=120, total_tokens=420) + + def test_raw_representation_dict_usage(self): + response = MagicMock() + response.usage_details = None + response.usage = None + response.raw_representation.usage = { + "prompt_tokens": 50, + "completion_tokens": 25, + "total_tokens": 75, + } + result = extract_usage(response) + assert result == TokenUsage(input_tokens=50, output_tokens=25, total_tokens=75) + + def test_usage_details_object_with_attributes(self): + """Handle UsageDetails object (not dict) from agent framework.""" + response = MagicMock() + usage_obj = MagicMock() + usage_obj.input_token_count = 400 + usage_obj.output_token_count = 150 + usage_obj.total_token_count = 550 + response.usage_details = usage_obj + result = extract_usage(response) + assert result == TokenUsage(input_tokens=400, output_tokens=150, total_tokens=550) + + def test_none_returns_none(self): + assert extract_usage(None) is None + + def test_no_usage_returns_none(self): + response = MagicMock() + response.usage_details = None + response.usage = None + response.raw_representation = None + response.messages = None + result = extract_usage(response) + assert result is None + + def test_total_computed_from_input_output_when_missing(self): + response = MagicMock() + response.usage_details = { + "input_token_count": 100, + "output_token_count": 50, + } + result = extract_usage(response) + assert result.total_tokens == 150 + + +# ── extract_usage_from_dict ─────────────────────────────────────────── + + +class TestExtractUsageFromDict: + """Extraction from raw dict / SDK usage objects.""" + + def test_dict_with_standard_keys(self): + result = extract_usage_from_dict({ + "input_tokens": 100, + "output_tokens": 50, + "total_tokens": 150, + }) + assert result == TokenUsage(input_tokens=100, output_tokens=50, total_tokens=150) + + def test_none_returns_none(self): + assert extract_usage_from_dict(None) is None + + +# ── detect_invoked_tools ────────────────────────────────────────────── + + +class TestDetectInvokedTools: + """Tool detection from agent result messages.""" + + def test_detects_function_calls(self): + content1 = MagicMock() + content1.type = "function_call" + content1.name = "product_agent" + content2 = MagicMock() + content2.type = "text" + content2.name = None + msg = MagicMock() + msg.contents = [content1, content2] + result_obj = MagicMock() + result_obj.messages = [msg] + invoked = detect_invoked_tools(result_obj) + assert invoked == {"product_agent"} + + def test_returns_empty_for_none(self): + assert detect_invoked_tools(None) == set() + + +# ── TokenUsageEmitter ───────────────────────────────────────────────── + + +class TestTokenUsageEmitter: + """Custom event emission via the standardized emitter.""" + + def test_emit_agent_calls_sink(self): + sink = MagicMock() + emitter = TokenUsageEmitter( + connection_string="test", + event_sink=sink, + static_dimensions={"app": "content-processing"}, + ) + usage = TokenUsage(input_tokens=100, output_tokens=50, total_tokens=150) + emitter.emit_agent( + agent_name="MapHandler", + model_deployment_name="gpt-4o", + usage=usage, + process_id="proc-123", + ) + sink.assert_called_once() + call_args = sink.call_args + assert call_args[0][0] == "LLM_Agent_Token_Usage" + props = call_args[0][1] + assert props["agent_name"] == "MapHandler" + assert props["input_tokens"] == "100" + assert props["app"] == "content-processing" + + def test_emit_all_emits_agent_model_summary(self): + sink = MagicMock() + emitter = TokenUsageEmitter( + connection_string="test", + event_sink=sink, + static_dimensions={"app": "content-processing"}, + ) + usage = TokenUsage(input_tokens=200, output_tokens=80, total_tokens=280) + emitter.emit_all( + agent_name="RAI", + model_deployment_name="gpt-4o", + usage=usage, + process_id="proc-456", + ) + event_names = [call[0][0] for call in sink.call_args_list] + assert "LLM_Agent_Token_Usage" in event_names + assert "LLM_Model_Token_Usage" in event_names + assert "LLM_Token_Usage_Summary" in event_names + + def test_emit_all_agent_count_correct(self): + sink = MagicMock() + emitter = TokenUsageEmitter( + connection_string="test", + event_sink=sink, + ) + usage = TokenUsage(input_tokens=100, output_tokens=50, total_tokens=150) + emitter.emit_all( + agent_name="MapHandler", + model_deployment_name="gpt-4o", + usage=usage, + ) + # Find the summary event call + summary_call = next( + call for call in sink.call_args_list + if call[0][0] == "LLM_Token_Usage_Summary" + ) + props = summary_call[0][1] + assert props["agent_count"] == "1" + assert props["model_count"] == "1" + + def test_emit_skips_when_not_configured(self): + emitter = TokenUsageEmitter(connection_string=None, event_sink=None) + assert not emitter.enabled + # Should not raise + emitter.emit("test_event", key="value") + + def test_perf_stats(self): + sink = MagicMock() + emitter = TokenUsageEmitter(connection_string="test", event_sink=sink) + emitter.emit("test_event") + stats = emitter.perf_stats() + assert stats["emit_count"] == 1.0 + assert stats["total_ms"] >= 0 + + +# ── TokenUsageScope ────────────────────────────────────────────────── + + +class TestTokenUsageScope: + """Context manager that accumulates usage and emits on exit.""" + + def test_scope_emits_on_exit(self): + sink = MagicMock() + emitter = TokenUsageEmitter( + connection_string="test", + event_sink=sink, + static_dimensions={"app": "content-processing"}, + ) + response = MagicMock() + response.usage_details = { + "input_token_count": 100, + "output_token_count": 50, + "total_token_count": 150, + } + with TokenUsageScope( + emitter, + agent_name="MapHandler", + model_deployment_name="gpt-4o", + process_id="proc-123", + ) as scope: + scope.add(response) + + assert scope.usage.input_tokens == 100 + assert scope.usage.output_tokens == 50 + event_names = [call[0][0] for call in sink.call_args_list] + assert "LLM_Agent_Token_Usage" in event_names + assert "LLM_Token_Usage_Summary" in event_names + + def test_scope_handles_no_usage(self): + sink = MagicMock() + emitter = TokenUsageEmitter(connection_string="test", event_sink=sink) + response = MagicMock() + response.usage_details = None + response.usage = None + response.raw_representation = None + response.messages = None + with TokenUsageScope( + emitter, + agent_name="Test", + model_deployment_name="gpt-4o", + ) as scope: + scope.add(response) + + assert not scope.usage.has_any + # No events should fire for zero usage + sink.assert_not_called() + + def test_scope_accumulates_multiple_adds(self): + sink = MagicMock() + emitter = TokenUsageEmitter(connection_string="test", event_sink=sink) + r1 = MagicMock() + r1.usage_details = {"input_token_count": 100, "output_token_count": 50, "total_token_count": 150} + r2 = MagicMock() + r2.usage_details = {"input_token_count": 200, "output_token_count": 80, "total_token_count": 280} + with TokenUsageScope( + emitter, + agent_name="Test", + model_deployment_name="gpt-4o", + ) as scope: + scope.add(r1) + scope.add(r2) + + assert scope.usage.input_tokens == 300 + assert scope.usage.output_tokens == 130 + assert scope.usage.total_tokens == 430 diff --git a/src/ContentProcessor/uv.lock b/src/ContentProcessor/uv.lock index 4dcc1bf0..dcc6f3d4 100644 --- a/src/ContentProcessor/uv.lock +++ b/src/ContentProcessor/uv.lock @@ -698,6 +698,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/e1/28/af9ef022f21e3b51b3718d4348f771b490678c1116563895547c0a771362/azure_identity-1.26.0b1-py3-none-any.whl", hash = "sha256:dc608b59ae628a38611208ee761adeb1a2b9390258b58d6edcda2d24c50a4348", size = 197227, upload-time = "2025-11-07T03:04:16.923Z" }, ] +[[package]] +name = "azure-monitor-events-extension" +version = "0.1.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-sdk" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/cd/51/976c8cd4a76d41bcd4d3f6400aeed8fdd70d516d271badf9c4a5893a558d/azure-monitor-events-extension-0.1.0.tar.gz", hash = "sha256:094773685171a50aa5cc548279c9141c8a26682f6acef397815c528b53b838b5", size = 4165, upload-time = "2023-09-19T20:01:17.887Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/09/44/cbb68c55505a604de61caa44375be7371368e71aa8386b1576be5b789e11/azure_monitor_events_extension-0.1.0-py2.py3-none-any.whl", hash = "sha256:5d92abb5e6a32ab23b12c726def9f9607c6fa1d84900d493b906ff9ec489af4a", size = 4514, upload-time = "2023-09-19T20:01:16.162Z" }, +] + [[package]] name = "azure-monitor-opentelemetry" version = "1.8.7" @@ -974,6 +987,7 @@ dependencies = [ { name = "azure-ai-inference" }, { name = "azure-appconfiguration" }, { name = "azure-identity" }, + { name = "azure-monitor-events-extension" }, { name = "azure-monitor-opentelemetry" }, { name = "azure-storage-blob" }, { name = "azure-storage-queue" }, @@ -1011,6 +1025,7 @@ requires-dist = [ { name = "azure-ai-inference", specifier = "==1.0.0b9" }, { name = "azure-appconfiguration", specifier = "==1.8.0" }, { name = "azure-identity", specifier = "==1.26.0b1" }, + { name = "azure-monitor-events-extension", specifier = ">=0.1.0" }, { name = "azure-monitor-opentelemetry", specifier = "==1.8.7" }, { name = "azure-storage-blob", specifier = "==12.29.0b1" }, { name = "azure-storage-queue", specifier = "==12.16.0b1" }, diff --git a/src/ContentProcessorWorkflow/pyproject.toml b/src/ContentProcessorWorkflow/pyproject.toml index 804ed5f4..ef414f38 100644 --- a/src/ContentProcessorWorkflow/pyproject.toml +++ b/src/ContentProcessorWorkflow/pyproject.toml @@ -14,6 +14,7 @@ dependencies = [ "azure-appconfiguration==1.8.0", "azure-core==1.38.0", "azure-identity==1.26.0b1", + "azure-monitor-events-extension==0.1.0", "azure-monitor-opentelemetry==1.8.7", "azure-storage-blob==12.29.0b1", "azure-storage-file-datalake==12.23.0", diff --git a/src/ContentProcessorWorkflow/src/libs/llm_token_telemetry.py b/src/ContentProcessorWorkflow/src/libs/llm_token_telemetry.py new file mode 100644 index 00000000..ae8aa383 --- /dev/null +++ b/src/ContentProcessorWorkflow/src/libs/llm_token_telemetry.py @@ -0,0 +1,1003 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +"""Cross-accelerator LLM token-usage telemetry helpers. + +A single, dependency-light helper module that can be dropped into any Microsoft +Solution Accelerator to capture LLM token usage and emit standardized custom +events to Application Insights. + +Why this file exists +-------------------- +Seven solution accelerators have independently shipped near-identical +``token_usage_utils.py`` modules. They all: + +* extract token counts from agent_framework / Azure OpenAI responses, +* emit the same three custom events (``LLM_Token_Usage_Summary``, + ``LLM_Agent_Token_Usage``, ``LLM_Model_Token_Usage``), +* defensively swallow telemetry errors, +* duplicate the same KQL queries and Azure Workbook. + +This module consolidates the union of those behaviours behind one stable API +so each accelerator can replace its bespoke helper with an import. + +Public API +---------- +- ``TokenUsage`` -- immutable dataclass for counts +- ``extract_usage(obj)`` -- agent_framework run result / message +- ``extract_usage_from_dict(d)`` -- raw dict from any SDK +- ``extract_usage_from_stream_chunk`` -- streaming chunks +- ``extract_realtime_usage(resp)`` -- Azure AI Voice Live response.done +- ``TokenUsageEmitter`` -- emits the three events + optional + per-user / per-team / speech events +- ``TokenUsageScope`` -- context-manager that accumulates and + auto-emits on exit +- ``track_tokens`` -- decorator wrapper around the scope + +Design rules +------------ +* Telemetry NEVER raises. Extraction failures return ``None``; emission + failures are logged at WARNING. +* No hard dependency on ``azure-monitor-events-extension``; if absent the + emitter degrades to logging only. +* Arbitrary correlation dimensions are passed as ``**dimensions`` kwargs and + surface verbatim as custom-event properties. This is how each accelerator + attaches its own keys (``conversation_id``, ``process_id``, ``team_name``, + ``file_name``, ``tenant``, etc.) without forking the helper. +""" +from __future__ import annotations + +import asyncio +import functools +import hashlib +import logging +import os +import random +import time +from contextlib import AbstractContextManager +from dataclasses import dataclass, field +from typing import Any, Callable, Iterable, Mapping, Optional +from unittest.mock import NonCallableMock + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Event-name constants -- keep these stable; KQL queries and workbooks bind +# to these exact strings. +# --------------------------------------------------------------------------- +EVENT_SUMMARY = "LLM_Token_Usage_Summary" +EVENT_AGENT = "LLM_Agent_Token_Usage" +EVENT_MODEL = "LLM_Model_Token_Usage" +EVENT_USER = "LLM_User_Token_Usage" +EVENT_TEAM = "LLM_Team_Token_Usage" +EVENT_SPEECH = "Speech_Usage" + + +# Token-count field aliases observed across model providers / SDK versions. +_INPUT_KEYS = ( + "input_token_count", + "input_tokens", + "prompt_tokens", + "promptTokens", +) +_OUTPUT_KEYS = ( + "output_token_count", + "output_tokens", + "completion_tokens", + "completionTokens", +) +_TOTAL_KEYS = ( + "total_token_count", + "total_tokens", + "totalTokens", +) + + +# --------------------------------------------------------------------------- +# Data model +# --------------------------------------------------------------------------- +@dataclass(frozen=True) +class TokenUsage: + """Normalized token-usage record. + + Attributes: + input_tokens: Number of input/prompt tokens consumed. + output_tokens: Number of output/completion tokens consumed. + total_tokens: Total token count (input + output). + input_audio_tokens: Audio input tokens (realtime/voice only). + input_text_tokens: Text input tokens (realtime/voice only). + input_cached_tokens: Cached input tokens (realtime/voice only). + output_audio_tokens: Audio output tokens (realtime/voice only). + output_text_tokens: Text output tokens (realtime/voice only). + """ + + input_tokens: int = 0 + output_tokens: int = 0 + total_tokens: int = 0 + + input_audio_tokens: Optional[int] = None + input_text_tokens: Optional[int] = None + input_cached_tokens: Optional[int] = None + output_audio_tokens: Optional[int] = None + output_text_tokens: Optional[int] = None + + @property + def has_any(self) -> bool: + """True if any token count is non-zero.""" + return bool(self.input_tokens or self.output_tokens or self.total_tokens) + + def __add__(self, other: "TokenUsage") -> "TokenUsage": + if not isinstance(other, TokenUsage): + return NotImplemented + + def _sum(a: Optional[int], b: Optional[int]) -> Optional[int]: + if a is None and b is None: + return None + return (a or 0) + (b or 0) + + return TokenUsage( + input_tokens=self.input_tokens + other.input_tokens, + output_tokens=self.output_tokens + other.output_tokens, + total_tokens=self.total_tokens + other.total_tokens, + input_audio_tokens=_sum(self.input_audio_tokens, other.input_audio_tokens), + input_text_tokens=_sum(self.input_text_tokens, other.input_text_tokens), + input_cached_tokens=_sum(self.input_cached_tokens, other.input_cached_tokens), + output_audio_tokens=_sum(self.output_audio_tokens, other.output_audio_tokens), + output_text_tokens=_sum(self.output_text_tokens, other.output_text_tokens), + ) + + def to_event_props(self) -> dict[str, str]: + """Stringified property bag suitable for App Insights custom events.""" + props: dict[str, str] = { + "input_tokens": str(self.input_tokens), + "output_tokens": str(self.output_tokens), + "total_tokens": str(self.total_tokens), + } + for name in ( + "input_audio_tokens", + "input_text_tokens", + "input_cached_tokens", + "output_audio_tokens", + "output_text_tokens", + ): + value = getattr(self, name) + if value is not None: + props[name] = str(value) + return props + + +# --------------------------------------------------------------------------- +# Low-level coercion helpers +# --------------------------------------------------------------------------- +def _to_int(value: Any, default: int = 0) -> int: + """Best-effort int conversion; bool excluded; never raises.""" + if value is None or isinstance(value, bool): + return default + if isinstance(value, int): + return value + if isinstance(value, float): + return int(value) + if isinstance(value, str): + s = value.strip() + if s.isdigit(): + return int(s) + try: + return int(value) + except (TypeError, ValueError): + return default + + +def _get(obj: Any, key: str, default: Any = None) -> Any: + """Read an attribute or dict key uniformly.""" + if obj is None: + return default + if isinstance(obj, Mapping): + return obj.get(key, default) + return getattr(obj, key, default) + + +def _is_iterable(obj: Any) -> bool: + """True only for real iterables (lists/tuples/sets/generators), NOT for + arbitrary objects that happen to expose ``__iter__``.""" + if obj is None: + return False + if isinstance(obj, (list, tuple, set, frozenset)): + return True + if isinstance(obj, (str, bytes, bytearray, Mapping)): + return False + if isinstance(obj, NonCallableMock): + return False + return hasattr(obj, "__iter__") + + +def _read_counts(usage_obj: Any) -> Optional[TokenUsage]: + """Read ``input/output/total`` from any usage-bearing object/dict.""" + if usage_obj is None: + return None + + inp = out = tot = 0 + for k in _INPUT_KEYS: + v = _get(usage_obj, k) + if v: + inp = _to_int(v) + break + for k in _OUTPUT_KEYS: + v = _get(usage_obj, k) + if v: + out = _to_int(v) + break + for k in _TOTAL_KEYS: + v = _get(usage_obj, k) + if v: + tot = _to_int(v) + break + + if tot == 0 and (inp or out): + tot = inp + out + if not (inp or out or tot): + return None + return TokenUsage(input_tokens=inp, output_tokens=out, total_tokens=tot) + + +# --------------------------------------------------------------------------- +# Extraction -- public +# --------------------------------------------------------------------------- +def extract_usage(result: Any) -> Optional[TokenUsage]: + """Extract usage from an agent_framework run result or ChatCompletion. + + Checks (in order): + 1. ``result.usage_details`` or ``result.usage`` + 2. ``result.raw_representation.usage`` (OpenAI ChatCompletion shape) + 3. Aggregated ``result.messages[*].contents[*].usage_details`` + + Never raises -- returns ``None`` on any unexpected shape. + """ + if result is None: + return None + + try: + for attr in ("usage_details", "usage"): + found = _read_counts(_get(result, attr)) + if found: + return found + + raw = _get(result, "raw_representation") + if raw is not None: + found = _read_counts(_get(raw, "usage")) + if found: + return found + + aggregated = TokenUsage() + found_any = False + messages = _get(result, "messages") + if not _is_iterable(messages): + return None + for msg in messages: + contents = _get(msg, "contents") + if not _is_iterable(contents): + continue + for content in contents: + usage = _get(content, "usage_details") or _get(content, "usage") + piece = _read_counts(usage) + if piece: + aggregated = aggregated + piece + found_any = True + return aggregated if found_any else None + except Exception as exc: + logger.debug("extract_usage failed: %s", exc, exc_info=True) + return None + + +def extract_usage_from_dict(data: Any) -> Optional[TokenUsage]: + """Extract from a raw dict / SDK usage object.""" + return _read_counts(data) + + +def extract_usage_from_stream_chunk(chunk: Any) -> Optional[TokenUsage]: + """Streaming chunks: try the top-level shape, then ``chunk.metadata.usage``.""" + found = extract_usage(chunk) + if found: + return found + metadata = _get(chunk, "metadata") + if metadata is not None: + return _read_counts(_get(metadata, "usage")) + return None + + +def extract_realtime_usage(response_obj: Any) -> Optional[TokenUsage]: + """Azure AI Voice Live ``response.done`` payload extractor. + + Includes audio / text / cached sub-counts when present. + """ + usage = _get(response_obj, "usage") + if usage is None: + return None + + inp = _to_int(_get(usage, "input_tokens")) + out = _to_int(_get(usage, "output_tokens")) + tot = _to_int(_get(usage, "total_tokens")) + if tot == 0 and (inp or out): + tot = inp + out + + in_details = _get(usage, "input_token_details") or {} + out_details = _get(usage, "output_token_details") or {} + + record = TokenUsage( + input_tokens=inp, + output_tokens=out, + total_tokens=tot, + input_audio_tokens=_to_int(_get(in_details, "audio_tokens")) if _get(in_details, "audio_tokens") is not None else None, + input_text_tokens=_to_int(_get(in_details, "text_tokens")) if _get(in_details, "text_tokens") is not None else None, + input_cached_tokens=_to_int(_get(in_details, "cached_tokens")) if _get(in_details, "cached_tokens") is not None else None, + output_audio_tokens=_to_int(_get(out_details, "audio_tokens")) if _get(out_details, "audio_tokens") is not None else None, + output_text_tokens=_to_int(_get(out_details, "text_tokens")) if _get(out_details, "text_tokens") is not None else None, + ) + if record.has_any or any( + v for v in ( + record.input_audio_tokens, + record.input_text_tokens, + record.input_cached_tokens, + record.output_audio_tokens, + record.output_text_tokens, + ) + ): + return record + return None + + +# --------------------------------------------------------------------------- +# Tool / sub-agent attribution +# --------------------------------------------------------------------------- +def detect_invoked_tools(result: Any) -> set[str]: + """Return the set of tool/function names invoked in an agent result. + + Used by orchestrators that expose sub-agents via ``.as_tool()`` to attribute + token usage only to the sub-agents that were actually called. Never raises. + """ + invoked: set[str] = set() + try: + messages = _get(result, "messages") + if not _is_iterable(messages): + return invoked + for msg in messages: + contents = _get(msg, "contents") + if not _is_iterable(contents): + continue + for content in contents: + if _get(content, "type") == "function_call": + name = _get(content, "name") + if name: + invoked.add(str(name)) + except Exception as exc: + logger.debug("detect_invoked_tools failed: %s", exc, exc_info=True) + return invoked + + +# --------------------------------------------------------------------------- +# Event sink (optional Application Insights dependency) +# --------------------------------------------------------------------------- +EventSink = Callable[[str, Mapping[str, str]], None] + + +def _default_event_sink() -> Optional[EventSink]: + """Return ``azure.monitor.events.extension.track_event`` if importable, + else ``None``.""" + try: + from azure.monitor.events.extension import track_event # type: ignore + except Exception: # pragma: no cover - optional dep + return None + return track_event + + +# --------------------------------------------------------------------------- +# Emitter +# --------------------------------------------------------------------------- +class TokenUsageEmitter: + """Emit standardized token-usage custom events to Application Insights. + + Responsibilities: + 1. Emit LLM_Agent_Token_Usage, LLM_Model_Token_Usage, and + LLM_Token_Usage_Summary events with consistent property schemas. + 2. Optionally sample high-cardinality events while always emitting + the summary event for accurate per-request totals. + 3. Support per-model pricing for estimated cost calculation. + 4. Hash user_id values for PII/GDPR compliance when configured. + + Attributes: + perf_slow_emit_threshold_ms: Soft threshold (ms) above which a + WARNING is logged for an individual emit call. + """ + + def __init__( + self, + *, + connection_string: Optional[str] = None, + static_dimensions: Optional[Mapping[str, Any]] = None, + event_sink: Optional[EventSink] = None, + pricing: Optional[Mapping[str, tuple[float, float]]] = None, + user_id_hasher: Optional[Callable[[str], str]] = None, + sample_rate: float = 1.0, + logger: Optional[logging.Logger] = None, + ) -> None: + self._cs = connection_string if connection_string is not None else os.getenv( + "APPLICATIONINSIGHTS_CONNECTION_STRING" + ) + self._sink = event_sink if event_sink is not None else _default_event_sink() + self._log = logger or logging.getLogger(__name__) + + self._user_id_hasher = user_id_hasher + + try: + sr = float(sample_rate) + except (TypeError, ValueError): + sr = 1.0 + self._sample_rate = max(0.0, min(1.0, sr)) + + self._pricing: dict[str, tuple[float, float]] = {} + for model, rates in (pricing or {}).items(): + if not model or rates is None: + continue + try: + inp, out = rates + self._pricing[str(model).lower()] = (float(inp), float(out)) + except (TypeError, ValueError): + self._log.warning("Ignoring malformed pricing entry: %s=%r", model, rates) + + raw_static = dict(static_dimensions or {}) + if "user_id" in raw_static: + raw_static["user_id"] = self._apply_user_id_hash(raw_static["user_id"]) + self._static: dict[str, str] = { + k: ("" if v is None else str(v)) for k, v in raw_static.items() + } + + self._perf_total_ns: int = 0 + self._perf_emit_count: int = 0 + self._perf_max_ns: int = 0 + self.perf_slow_emit_threshold_ms: float = 50.0 + + @property + def enabled(self) -> bool: + """True when App Insights connection string and event sink are available.""" + return bool(self._cs) and self._sink is not None + + @property + def sample_rate(self) -> float: + """Current sampling rate for high-cardinality events.""" + return self._sample_rate + + def _apply_user_id_hash(self, value: Any) -> Any: + """Apply the configured user_id_hasher; never raises.""" + if value is None or value == "" or self._user_id_hasher is None: + return value + try: + return self._user_id_hasher(str(value)) + except Exception as exc: + self._log.warning("user_id_hasher raised: %s", exc) + return value + + def _should_sample(self) -> bool: + """Sampling decision for high-cardinality events.""" + if self._sample_rate >= 1.0: + return True + if self._sample_rate <= 0.0: + return False + return random.random() < self._sample_rate + + def _cost_props( + self, model_deployment_name: Optional[str], usage: TokenUsage + ) -> dict[str, str]: + """Return ``{'estimated_cost_usd': '...'}`` when pricing is configured.""" + if not self._pricing or not model_deployment_name: + return {} + rate = self._pricing.get(model_deployment_name.lower()) + if not rate: + return {} + inp_rate, out_rate = rate + cost = (usage.input_tokens * inp_rate + usage.output_tokens * out_rate) / 1000.0 + return {"estimated_cost_usd": f"{cost:.6f}"} + + def _summary_cost_props( + self, + primary_model: Optional[str], + additional_agents: Mapping[str, str], + usage: TokenUsage, + ) -> dict[str, str]: + """Best-effort cost for the summary event.""" + if primary_model: + cost = self._cost_props(primary_model, usage) + if cost: + return cost + for m in additional_agents.values(): + cost = self._cost_props(m, usage) + if cost: + return cost + return {} + + def emit(self, event_name: str, **dimensions: Any) -> None: + """Low-level: emit an event with arbitrary properties. Never raises.""" + start_ns = time.perf_counter_ns() + try: + props = dict(self._static) + for k, v in dimensions.items(): + if v is None: + continue + if k == "user_id": + v = self._apply_user_id_hash(v) + if v is None or v == "": + continue + props[k] = v if isinstance(v, str) else str(v) + + # Deterministic event_id for deduplication across services. + # Key fields: event_name + process_id + agent_name + model + dedup_parts = [ + event_name, + props.get("process_id", ""), + props.get("agent_name", ""), + props.get("model_deployment_name", ""), + ] + props["event_id"] = hashlib.sha256( + "|".join(dedup_parts).encode() + ).hexdigest()[:16] + + if not self.enabled: + self._log.debug( + "App Insights not configured -- skipping event %s (%s)", + event_name, props, + ) + return + try: + self._sink(event_name, props) # type: ignore[misc] + except Exception as exc: + self._log.warning("track_event(%s) failed: %s", event_name, exc) + finally: + elapsed_ns = time.perf_counter_ns() - start_ns + self._perf_total_ns += elapsed_ns + self._perf_emit_count += 1 + if elapsed_ns > self._perf_max_ns: + self._perf_max_ns = elapsed_ns + elapsed_ms = elapsed_ns / 1_000_000.0 + if elapsed_ms > self.perf_slow_emit_threshold_ms: + self._log.warning( + "Token telemetry emit slow: event=%s duration_ms=%.3f", + event_name, elapsed_ms, + ) + else: + self._log.debug( + "Token telemetry emit: event=%s duration_ms=%.3f", + event_name, elapsed_ms, + ) + + def perf_stats(self) -> dict[str, float]: + """Return cumulative telemetry-overhead stats. + + Returns: + Dict with keys: emit_count, total_ms, avg_ms, max_ms. + """ + count = self._perf_emit_count + total_ms = self._perf_total_ns / 1_000_000.0 + return { + "emit_count": float(count), + "total_ms": total_ms, + "avg_ms": (total_ms / count) if count else 0.0, + "max_ms": self._perf_max_ns / 1_000_000.0, + } + + def reset_perf_stats(self) -> None: + """Zero the perf counters.""" + self._perf_total_ns = 0 + self._perf_emit_count = 0 + self._perf_max_ns = 0 + + def emit_agent( + self, + *, + agent_name: str, + model_deployment_name: str, + usage: TokenUsage, + **dimensions: Any, + ) -> None: + """Emit a per-agent token usage event.""" + if not usage.has_any or not self._should_sample(): + return + self.emit( + EVENT_AGENT, + agent_name=agent_name, + model_deployment_name=model_deployment_name, + **usage.to_event_props(), + **self._cost_props(model_deployment_name, usage), + **dimensions, + ) + + def emit_model( + self, + *, + model_deployment_name: str, + usage: TokenUsage, + **dimensions: Any, + ) -> None: + """Emit a per-model token usage event.""" + if not usage.has_any or not self._should_sample(): + return + self.emit( + EVENT_MODEL, + model_deployment_name=model_deployment_name, + **usage.to_event_props(), + **self._cost_props(model_deployment_name, usage), + **dimensions, + ) + + def emit_user( + self, + *, + user_id: str, + usage: TokenUsage, + **dimensions: Any, + ) -> None: + """Emit a per-user token usage event.""" + if not usage.has_any or not user_id or not self._should_sample(): + return + self.emit( + EVENT_USER, + user_id=user_id, + **usage.to_event_props(), + **dimensions, + ) + + def emit_team( + self, + *, + team_name: str, + usage: TokenUsage, + **dimensions: Any, + ) -> None: + """Emit a per-team token usage event.""" + if not usage.has_any or not team_name or not self._should_sample(): + return + self.emit( + EVENT_TEAM, + team_name=team_name, + **usage.to_event_props(), + **dimensions, + ) + + def emit_summary( + self, + *, + usage: TokenUsage, + agent_count: int = 1, + model_count: int = 1, + primary_model: Optional[str] = None, + additional_agents: Optional[Mapping[str, str]] = None, + **dimensions: Any, + ) -> None: + """Emit the summary event (always fires, ignores sample_rate).""" + if not usage.has_any: + return + props = { + "total_input_tokens": str(usage.input_tokens), + "total_output_tokens": str(usage.output_tokens), + "total_tokens": str(usage.total_tokens), + "agent_count": str(agent_count), + "model_count": str(model_count), + "sample_rate": f"{self._sample_rate:.4f}", + } + for k, v in usage.to_event_props().items(): + props.setdefault(k, v) + props.update(self._summary_cost_props(primary_model, additional_agents or {}, usage)) + self.emit(EVENT_SUMMARY, **props, **dimensions) + + def emit_speech( + self, + *, + model_deployment_name: str, + source: str, + usage: TokenUsage, + **dimensions: Any, + ) -> None: + """Voice-Live / realtime speech usage event.""" + if not self._should_sample(): + return + self.emit( + EVENT_SPEECH, + model_deployment_name=model_deployment_name, + source=source, + **usage.to_event_props(), + **self._cost_props(model_deployment_name, usage), + **dimensions, + ) + + def emit_all( + self, + *, + agent_name: str, + model_deployment_name: str, + usage: TokenUsage, + additional_agents: Optional[Mapping[str, str]] = None, + emit_user_event: bool = False, + emit_team_event: bool = False, + **dimensions: Any, + ) -> None: + """Emit summary, agent, and one model event per distinct model deployment. + + Args: + agent_name: Name of the primary agent/step. + model_deployment_name: Model deployment used by the primary agent. + usage: Accumulated token usage for this invocation. + additional_agents: Maps sub-agent name -> model deployment name. + emit_user_event: Opt in to per-user events. + emit_team_event: Opt in to per-team events. + **dimensions: Extra properties forwarded to all events. + """ + if not usage.has_any: + return + + agents = {agent_name: model_deployment_name} + if additional_agents: + agents.update({k: v for k, v in additional_agents.items() if k}) + models = {m for m in agents.values() if m} + + batch_start_ns = time.perf_counter_ns() + + self.emit_agent( + agent_name=agent_name, + model_deployment_name=model_deployment_name, + usage=usage, + **dimensions, + ) + for model in models: + self.emit_model( + model_deployment_name=model, + usage=usage, + **dimensions, + ) + if emit_user_event and dimensions.get("user_id"): + self.emit_user( + user_id=str(dimensions["user_id"]), + usage=usage, + agent_name=agent_name, + model_deployment_name=model_deployment_name, + ) + if emit_team_event and dimensions.get("team_name"): + self.emit_team( + team_name=str(dimensions["team_name"]), + usage=usage, + agent_name=agent_name, + model_deployment_name=model_deployment_name, + ) + + batch_overhead_ms = (time.perf_counter_ns() - batch_start_ns) / 1_000_000.0 + self.emit_summary( + usage=usage, + agent_count=len(agents), + model_count=len(models) or 1, + primary_model=model_deployment_name, + additional_agents=additional_agents, + telemetry_overhead_ms=f"{batch_overhead_ms:.3f}", + **dimensions, + ) + + self._log.debug( + "[TOKEN USAGE] agent=%s model=%s input=%d output=%d total=%d %s", + agent_name, + model_deployment_name, + usage.input_tokens, + usage.output_tokens, + usage.total_tokens, + " ".join(f"{k}={v}" for k, v in dimensions.items() if v), + ) + + +# --------------------------------------------------------------------------- +# Scope / decorator sugar +# --------------------------------------------------------------------------- +@dataclass +class TokenUsageScope(AbstractContextManager): + """Accumulate usage across multiple results, then emit on exit. + + Example:: + + with TokenUsageScope(emitter, + agent_name="MapHandler", + model_deployment_name=cfg.model, + process_id=pid) as scope: + result = await agent.run(prompt) + scope.add(result) + + Attributes: + emitter: The TokenUsageEmitter instance to use for emission. + agent_name: Name of the agent/step being tracked. + model_deployment_name: Model deployment name for attribution. + dimensions: Extra properties forwarded to all events. + additional_agents: Maps sub-agent name -> model deployment name. + emit_user_event: Whether to emit per-user events. + emit_team_event: Whether to emit per-team events. + usage: Accumulated TokenUsage so far. + """ + + emitter: TokenUsageEmitter + agent_name: str + model_deployment_name: str + dimensions: dict[str, Any] = field(default_factory=dict) + additional_agents: dict[str, str] = field(default_factory=dict) + emit_user_event: bool = False + emit_team_event: bool = False + usage: TokenUsage = field(default_factory=TokenUsage) + + def __init__( + self, + emitter: TokenUsageEmitter, + *, + agent_name: str, + model_deployment_name: str, + additional_agents: Optional[Mapping[str, str]] = None, + emit_user_event: bool = False, + emit_team_event: bool = False, + **dimensions: Any, + ) -> None: + self.emitter = emitter + self.agent_name = agent_name + self.model_deployment_name = model_deployment_name + self.additional_agents = dict(additional_agents or {}) + self.emit_user_event = emit_user_event + self.emit_team_event = emit_team_event + self.dimensions = dict(dimensions) + self.usage = TokenUsage() + self._extract_ns: int = 0 + self._emit_ns: int = 0 + + def add(self, source: Any) -> Optional[TokenUsage]: + """Extract usage from any supported shape and add to the running total. + + Args: + source: Agent run result, ChatMessage, or ChatCompletion object. + + Returns: + The extracted TokenUsage, or None if extraction failed. + """ + start_ns = time.perf_counter_ns() + try: + found = extract_usage(source) + except Exception as exc: + logger.debug("TokenUsageScope.add failed: %s", exc, exc_info=True) + return None + finally: + self._extract_ns += time.perf_counter_ns() - start_ns + if found: + self.usage = self.usage + found + return found + + def add_usage(self, usage: TokenUsage) -> None: + """Add a pre-constructed TokenUsage to the running total.""" + self.usage = self.usage + usage + + def add_chunks(self, chunks: Iterable[Any]) -> None: + """Extract and accumulate usage from a stream of chunks.""" + for c in chunks: + self.add(c) + + @property + def extract_ms(self) -> float: + """Total ms spent inside :meth:`add` / :meth:`add_chunks`.""" + return self._extract_ns / 1_000_000.0 + + @property + def emit_ms(self) -> float: + """Total ms spent in the on-exit emit batch.""" + return self._emit_ns / 1_000_000.0 + + @property + def total_overhead_ms(self) -> float: + """Total telemetry overhead added by this scope (extract + emit).""" + return self.extract_ms + self.emit_ms + + def __exit__(self, exc_type, exc, tb) -> None: + emit_start_ns = time.perf_counter_ns() + try: + self.emitter.emit_all( + agent_name=self.agent_name, + model_deployment_name=self.model_deployment_name, + usage=self.usage, + additional_agents=self.additional_agents, + emit_user_event=self.emit_user_event, + emit_team_event=self.emit_team_event, + **self.dimensions, + ) + except Exception as emit_exc: # pragma: no cover + logger.warning("TokenUsageScope emit failed: %s", emit_exc) + finally: + self._emit_ns += time.perf_counter_ns() - emit_start_ns + logger.debug( + "TokenUsageScope overhead: agent=%s extract_ms=%.3f " + "emit_ms=%.3f total_ms=%.3f", + self.agent_name, + self.extract_ms, + self.emit_ms, + self.total_overhead_ms, + ) + return None + + +def track_tokens( + emitter: TokenUsageEmitter, + *, + agent_name: str, + model_deployment_name: str, + dimension_args: Optional[Mapping[str, str]] = None, + additional_agents: Optional[Mapping[str, str]] = None, + emit_user_event: bool = False, + emit_team_event: bool = False, +): + """Decorator: wrap an async or sync function that returns an LLM result. + + Args: + emitter: TokenUsageEmitter to use. + agent_name: Name of the agent/step. + model_deployment_name: Model deployment name. + dimension_args: Maps emitted-property-name -> callable-keyword-argument. + additional_agents: Sub-agent name -> model deployment name mapping. + emit_user_event: Opt in to per-user events. + emit_team_event: Opt in to per-team events. + """ + + dim_args = dict(dimension_args or {}) + + def _decorator(fn: Callable[..., Any]): + is_coro = _is_coroutine_function(fn) + + if is_coro: + @functools.wraps(fn) + async def _aw(*args, **kwargs) -> Any: + with _scope_for(kwargs) as scope: + result = await fn(*args, **kwargs) + scope.add(result) + return result + return _aw + + @functools.wraps(fn) + def _sw(*args, **kwargs) -> Any: + with _scope_for(kwargs) as scope: + result = fn(*args, **kwargs) + scope.add(result) + return result + return _sw + + def _scope_for(call_kwargs: Mapping[str, Any]) -> TokenUsageScope: + dimensions = { + prop: call_kwargs.get(kw) + for prop, kw in dim_args.items() + if call_kwargs.get(kw) is not None + } + return TokenUsageScope( + emitter, + agent_name=agent_name, + model_deployment_name=model_deployment_name, + additional_agents=additional_agents, + emit_user_event=emit_user_event, + emit_team_event=emit_team_event, + **dimensions, + ) + + return _decorator + + +def _is_coroutine_function(fn: Callable[..., Any]) -> bool: + return asyncio.iscoroutinefunction(fn) + + +__all__ = [ + "EVENT_SUMMARY", + "EVENT_AGENT", + "EVENT_MODEL", + "EVENT_USER", + "EVENT_TEAM", + "EVENT_SPEECH", + "TokenUsage", + "TokenUsageEmitter", + "TokenUsageScope", + "track_tokens", + "extract_usage", + "extract_usage_from_dict", + "extract_usage_from_stream_chunk", + "extract_realtime_usage", + "detect_invoked_tools", +] diff --git a/src/ContentProcessorWorkflow/src/libs/telemetry.py b/src/ContentProcessorWorkflow/src/libs/telemetry.py new file mode 100644 index 00000000..79e1f4bf --- /dev/null +++ b/src/ContentProcessorWorkflow/src/libs/telemetry.py @@ -0,0 +1,92 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +"""Process-wide telemetry singletons for the content-processing workflow. + +A single :class:`TokenUsageEmitter` is constructed at import time so every +executor shares the same App Insights connection-string resolution and +static dimensions. Importing this module has no side effects beyond reading +``APPLICATIONINSIGHTS_CONNECTION_STRING`` and the env vars documented below. + +Optional environment variables +------------------------------ +LLM_TOKEN_SAMPLE_RATE + Float in [0, 1]. Fraction of high-cardinality token events + (agent/model/user/team/speech) to ship. The summary event always fires. + Defaults to ``1.0``. + +LLM_TOKEN_USER_ID_HMAC_KEY + When set, ``user_id`` values are replaced with an HMAC-SHA256 hex digest + (truncated to 16 chars) before leaving the process. Use to satisfy + GDPR / PII handling requirements without modifying call sites. + +LLM_TOKEN_PRICING + Optional comma-separated list of ``model=in_per_1k:out_per_1k`` entries, + e.g. ``gpt-4o=0.0025:0.01,gpt-4o-mini=0.00015:0.0006``. When set the + emitter attaches ``estimated_cost_usd`` to agent / model / summary + events so dashboards can group by cost without hard-coded KQL rates. +""" +from __future__ import annotations + +import hashlib +import hmac +import logging +import os +from typing import Callable, Optional + +from libs.llm_token_telemetry import TokenUsageEmitter + +_log = logging.getLogger(__name__) + + +def _parse_sample_rate() -> float: + raw = os.getenv("LLM_TOKEN_SAMPLE_RATE") + if not raw: + return 1.0 + try: + return max(0.0, min(1.0, float(raw))) + except ValueError: + _log.warning("Invalid LLM_TOKEN_SAMPLE_RATE=%r; defaulting to 1.0", raw) + return 1.0 + + +def _build_user_id_hasher() -> Optional[Callable[[str], str]]: + key = os.getenv("LLM_TOKEN_USER_ID_HMAC_KEY") + if not key: + return None + key_bytes = key.encode("utf-8") + + def _hash(value: str) -> str: + digest = hmac.new(key_bytes, value.encode("utf-8"), hashlib.sha256).hexdigest() + return digest[:16] + + return _hash + + +def _parse_pricing() -> dict[str, tuple[float, float]]: + raw = os.getenv("LLM_TOKEN_PRICING") + if not raw: + return {} + pricing: dict[str, tuple[float, float]] = {} + for entry in raw.split(","): + entry = entry.strip() + if not entry or "=" not in entry: + continue + model, rates = entry.split("=", 1) + if ":" not in rates: + continue + in_s, out_s = rates.split(":", 1) + try: + pricing[model.strip().lower()] = (float(in_s), float(out_s)) + except ValueError: + _log.warning("Ignoring malformed pricing entry: %s", entry) + return pricing + + +token_emitter = TokenUsageEmitter( + static_dimensions={"app": "content-processing"}, + sample_rate=_parse_sample_rate(), + user_id_hasher=_build_user_id_hasher(), + pricing=_parse_pricing(), +) + +__all__ = ["token_emitter"] diff --git a/src/ContentProcessorWorkflow/src/steps/gap_analysis/executor/gap_executor.py b/src/ContentProcessorWorkflow/src/steps/gap_analysis/executor/gap_executor.py index 5390b09b..24c5785c 100644 --- a/src/ContentProcessorWorkflow/src/steps/gap_analysis/executor/gap_executor.py +++ b/src/ContentProcessorWorkflow/src/steps/gap_analysis/executor/gap_executor.py @@ -31,6 +31,9 @@ from steps.models.extracted_file import ExtractedFile from steps.models.output import Executor_Output, Workflow_Output +from libs.llm_token_telemetry import TokenUsageScope +from libs.telemetry import token_emitter + class GapExecutor(Executor): """Workflow executor that runs the GAP-analysis step. @@ -192,6 +195,20 @@ async def handle_execute( ) ) + # Track token usage for gap analysis + model_name = agent_framework_helper.settings.get_service_config("default").chat_deployment_name + file_names = ", ".join(f.file_name for f in processed_files) if processed_files else "" + file_types = ", ".join(set(f.mime_type for f in processed_files if f.mime_type)) if processed_files else "" + with TokenUsageScope( + token_emitter, + agent_name="GapAnalysis", + model_deployment_name=model_name, + process_id=result.claim_process_id, + file_name=file_names, + file_mime_type=file_types, + ) as scope: + scope.add(model_response) + claim_process_repository = self.app_context.get_service(Claim_Processes) await claim_process_repository.Update_Claim_Process_Gaps( process_id=result.claim_process_id, new_gaps=model_response.text diff --git a/src/ContentProcessorWorkflow/src/steps/rai/executor/rai_executor.py b/src/ContentProcessorWorkflow/src/steps/rai/executor/rai_executor.py index 64bca6f3..de56fd9f 100644 --- a/src/ContentProcessorWorkflow/src/steps/rai/executor/rai_executor.py +++ b/src/ContentProcessorWorkflow/src/steps/rai/executor/rai_executor.py @@ -27,6 +27,9 @@ from services.content_process_service import ContentProcessService from steps.rai.model import rai_response +from libs.llm_token_telemetry import TokenUsageScope +from libs.telemetry import token_emitter + class RAIExecutor(Executor): """Workflow executor that applies Responsible-AI content analysis. @@ -186,6 +189,20 @@ async def handle_exectue( ) ) + # Track token usage for RAI check + model_name = agent_framework_helper.settings.get_service_config("default").chat_deployment_name + file_names = ", ".join(f.file_name for f in processed_files) if processed_files else "" + file_types = ", ".join(set(f.mime_type for f in processed_files if f.mime_type)) if processed_files else "" + with TokenUsageScope( + token_emitter, + agent_name="RAI", + model_deployment_name=model_name, + process_id=result.claim_process_id, + file_name=file_names, + file_mime_type=file_types, + ) as scope: + scope.add(model_response) + response_content = model_response.text parsed_response = rai_response.RAIResponse.model_validate_json(response_content) diff --git a/src/ContentProcessorWorkflow/src/steps/summarize/executor/summarize_executor.py b/src/ContentProcessorWorkflow/src/steps/summarize/executor/summarize_executor.py index a4e8b910..23bd30ce 100644 --- a/src/ContentProcessorWorkflow/src/steps/summarize/executor/summarize_executor.py +++ b/src/ContentProcessorWorkflow/src/steps/summarize/executor/summarize_executor.py @@ -28,6 +28,9 @@ from steps.models.extracted_file import ExtractedFile from steps.models.output import Executor_Output, Workflow_Output +from libs.llm_token_telemetry import TokenUsageScope +from libs.telemetry import token_emitter + class SummarizeExecutor(Executor): """Workflow executor that runs the summarization step. @@ -192,6 +195,20 @@ async def handle_execute( ) ) + # Track token usage for summarization + model_name = agent_framework_helper.settings.get_service_config("default").chat_deployment_name + file_names = ", ".join(f.file_name for f in processed_files) if processed_files else "" + file_types = ", ".join(set(f.mime_type for f in processed_files if f.mime_type)) if processed_files else "" + with TokenUsageScope( + token_emitter, + agent_name="Summarize", + model_deployment_name=model_name, + process_id=result.claim_process_id, + file_name=file_names, + file_mime_type=file_types, + ) as scope: + scope.add(model_response) + summarized_result = {"status": "summarized", "input": model_response.text} claim_process_repository = self.app_context.get_service(Claim_Processes) diff --git a/src/ContentProcessorWorkflow/uv.lock b/src/ContentProcessorWorkflow/uv.lock index 9fd628a0..608166a4 100644 --- a/src/ContentProcessorWorkflow/uv.lock +++ b/src/ContentProcessorWorkflow/uv.lock @@ -736,6 +736,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/e1/28/af9ef022f21e3b51b3718d4348f771b490678c1116563895547c0a771362/azure_identity-1.26.0b1-py3-none-any.whl", hash = "sha256:dc608b59ae628a38611208ee761adeb1a2b9390258b58d6edcda2d24c50a4348", size = 197227, upload-time = "2025-11-07T03:04:16.923Z" }, ] +[[package]] +name = "azure-monitor-events-extension" +version = "0.1.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-sdk" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/cd/51/976c8cd4a76d41bcd4d3f6400aeed8fdd70d516d271badf9c4a5893a558d/azure-monitor-events-extension-0.1.0.tar.gz", hash = "sha256:094773685171a50aa5cc548279c9141c8a26682f6acef397815c528b53b838b5", size = 4165, upload-time = "2023-09-19T20:01:17.887Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/09/44/cbb68c55505a604de61caa44375be7371368e71aa8386b1576be5b789e11/azure_monitor_events_extension-0.1.0-py2.py3-none-any.whl", hash = "sha256:5d92abb5e6a32ab23b12c726def9f9607c6fa1d84900d493b906ff9ec489af4a", size = 4514, upload-time = "2023-09-19T20:01:16.162Z" }, +] + [[package]] name = "azure-monitor-opentelemetry" version = "1.8.7" @@ -2960,6 +2973,7 @@ dependencies = [ { name = "azure-appconfiguration" }, { name = "azure-core" }, { name = "azure-identity" }, + { name = "azure-monitor-events-extension" }, { name = "azure-monitor-opentelemetry" }, { name = "azure-storage-blob" }, { name = "azure-storage-file-datalake" }, @@ -3001,6 +3015,7 @@ requires-dist = [ { name = "azure-appconfiguration", specifier = "==1.8.0" }, { name = "azure-core", specifier = "==1.38.0" }, { name = "azure-identity", specifier = "==1.26.0b1" }, + { name = "azure-monitor-events-extension", specifier = ">=0.1.0" }, { name = "azure-monitor-opentelemetry", specifier = "==1.8.7" }, { name = "azure-storage-blob", specifier = "==12.29.0b1" }, { name = "azure-storage-file-datalake", specifier = "==12.23.0" }, @@ -3015,7 +3030,7 @@ requires-dist = [ { name = "psutil", specifier = "==7.2.1" }, { name = "pyasn1", specifier = "==0.6.3" }, { name = "pyjwt", specifier = "==2.12.1" }, - { name = "python-multipart", specifier = "==0.0.27" }, + { name = "python-multipart", specifier = "==0.0.26" }, { name = "pytz", specifier = "==2025.2" }, { name = "sas-cosmosdb", specifier = "==0.1.4" }, { name = "sas-storage", specifier = "==1.0.0" }, @@ -3482,11 +3497,11 @@ wheels = [ [[package]] name = "python-multipart" -version = "0.0.27" +version = "0.0.26" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/69/9b/f23807317a113dc36e74e75eb265a02dd1a4d9082abc3c1064acd22997c4/python_multipart-0.0.27.tar.gz", hash = "sha256:9870a6a8c5a20a5bf4f07c017bd1489006ff8836cff097b6933355ee2b49b602", size = 44043, upload-time = "2026-04-27T10:51:26.649Z" } +sdist = { url = "https://files.pythonhosted.org/packages/88/71/b145a380824a960ebd60e1014256dbb7d2253f2316ff2d73dfd8928ec2c3/python_multipart-0.0.26.tar.gz", hash = "sha256:08fadc45918cd615e26846437f50c5d6d23304da32c341f289a617127b081f17", size = 43501, upload-time = "2026-04-10T14:09:59.473Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/99/78/4126abcbdbd3c559d43e0db7f7b9173fc6befe45d39a2856cc0b8ec2a5a6/python_multipart-0.0.27-py3-none-any.whl", hash = "sha256:6fccfad17a27334bd0193681b369f476eda3409f17381a2d65aa7df3f7275645", size = 29254, upload-time = "2026-04-27T10:51:24.997Z" }, + { url = "https://files.pythonhosted.org/packages/9a/22/f1925cdda983ab66fc8ec6ec8014b959262747e58bdca26a4e3d1da29d56/python_multipart-0.0.26-py3-none-any.whl", hash = "sha256:c0b169f8c4484c13b0dcf2ef0ec3a4adb255c4b7d18d8e420477d2b1dd03f185", size = 28847, upload-time = "2026-04-10T14:09:58.131Z" }, ] [[package]] @@ -4077,11 +4092,11 @@ wheels = [ [[package]] name = "urllib3" -version = "2.7.0" +version = "2.6.3" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/53/0c/06f8b233b8fd13b9e5ee11424ef85419ba0d8ba0b3138bf360be2ff56953/urllib3-2.7.0.tar.gz", hash = "sha256:231e0ec3b63ceb14667c67be60f2f2c40a518cb38b03af60abc813da26505f4c", size = 433602, upload-time = "2026-05-07T16:13:18.596Z" } +sdist = { url = "https://files.pythonhosted.org/packages/c7/24/5f1b3bdffd70275f6661c76461e25f024d5a38a46f04aaca912426a2b1d3/urllib3-2.6.3.tar.gz", hash = "sha256:1b62b6884944a57dbe321509ab94fd4d3b307075e0c2eae991ac71ee15ad38ed", size = 435556, upload-time = "2026-01-07T16:24:43.925Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/7f/3e/5db95bcf282c52709639744ca2a8b149baccf648e39c8cc87553df9eae0c/urllib3-2.7.0-py3-none-any.whl", hash = "sha256:9fb4c81ebbb1ce9531cce37674bbc6f1360472bc18ca9a553ede278ef7276897", size = 131087, upload-time = "2026-05-07T16:13:17.151Z" }, + { url = "https://files.pythonhosted.org/packages/39/08/aaaad47bc4e9dc8c725e68f9d04865dbcb2052843ff09c97b08904852d84/urllib3-2.6.3-py3-none-any.whl", hash = "sha256:bf272323e553dfb2e87d9bfd225ca7b0f467b919d7bbd355436d3fd37cb0acd4", size = 131584, upload-time = "2026-01-07T16:24:42.685Z" }, ] [[package]] diff --git a/src/tests/ContentProcessorWorkflow/.coveragerc b/src/tests/ContentProcessorWorkflow/.coveragerc index 7827f004..ef92ca39 100644 --- a/src/tests/ContentProcessorWorkflow/.coveragerc +++ b/src/tests/ContentProcessorWorkflow/.coveragerc @@ -23,6 +23,9 @@ omit = # Exclude repositories and steps (require agent_framework) */repositories/* */steps/* + # Exclude cross-accelerator telemetry module (tested in ContentProcessor suite) + */libs/llm_token_telemetry.py + */libs/telemetry.py # Exclude test files */tests/* */test_*.py