diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index a8781c9b9..19e53e7b9 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -127,7 +127,7 @@ PromptClient, TextPromptClient, ) -from langfuse.types import MaskFunction, ScoreDataType, SpanLevel, TraceContext +from langfuse.types import BatchMaskFunction, MaskFunction, ScoreDataType, SpanLevel, TraceContext class Langfuse: @@ -165,6 +165,9 @@ class Langfuse: media_upload_thread_count (Optional[int]): Number of background threads for handling media uploads. Defaults to 1. Can also be set via LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT environment variable. sample_rate (Optional[float]): Sampling rate for traces (0.0 to 1.0). Defaults to 1.0 (100% of traces are sampled). Can also be set via LANGFUSE_SAMPLE_RATE environment variable. mask (Optional[MaskFunction]): Function to mask sensitive data in traces before sending to the API. + batch_mask (Optional[BatchMaskFunction]): Optional batch mask function for batch/async masking. When set, the SDK can mask spans in batches prior to export (e.g. to limit network calls to a masking backend). Use with use_async_masking to run masking off the main thread. + mask_batch_size (Optional[int]): Max number of items to pass per batch to batch_mask. Ignored if batch_mask is not set. + use_async_masking (bool): If True and batch_mask is set, run batch masking in a background thread so the main thread is not blocked. Default False. blocked_instrumentation_scopes (Optional[List[str]]): Deprecated. Use `should_export_span` instead. Equivalent behavior: ```python from langfuse.span_filter import is_default_export_span @@ -220,6 +223,9 @@ class Langfuse: _resources: Optional[LangfuseResourceManager] = None _mask: Optional[MaskFunction] = None + _batch_mask: Optional[BatchMaskFunction] = None + _mask_batch_size: Optional[int] = None + _use_async_masking: bool = False _otel_tracer: otel_trace_api.Tracer def __init__( @@ -240,6 +246,9 @@ def __init__( media_upload_thread_count: Optional[int] = None, sample_rate: Optional[float] = None, mask: Optional[MaskFunction] = None, + batch_mask: Optional[BatchMaskFunction] = None, + mask_batch_size: Optional[int] = None, + use_async_masking: bool = False, blocked_instrumentation_scopes: Optional[List[str]] = None, should_export_span: Optional[Callable[[ReadableSpan], bool]] = None, additional_headers: Optional[Dict[str, str]] = None, @@ -335,6 +344,9 @@ def __init__( media_upload_thread_count=media_upload_thread_count, sample_rate=sample_rate, mask=mask, + batch_mask=batch_mask, + mask_batch_size=mask_batch_size, + use_async_masking=use_async_masking, tracing_enabled=self._tracing_enabled, blocked_instrumentation_scopes=blocked_instrumentation_scopes, should_export_span=should_export_span, @@ -342,6 +354,9 @@ def __init__( tracer_provider=tracer_provider, ) self._mask = self._resources.mask + self._batch_mask = self._resources.batch_mask + self._mask_batch_size = self._resources.mask_batch_size + self._use_async_masking = self._resources.use_async_masking self._otel_tracer = ( self._resources.tracer diff --git a/langfuse/_client/get_client.py b/langfuse/_client/get_client.py index dd2ee4a29..dc7a8c87f 100644 --- a/langfuse/_client/get_client.py +++ b/langfuse/_client/get_client.py @@ -50,6 +50,9 @@ def _create_client_from_instance( media_upload_thread_count=instance.media_upload_thread_count, sample_rate=instance.sample_rate, mask=instance.mask, + batch_mask=instance.batch_mask, + mask_batch_size=instance.mask_batch_size, + use_async_masking=instance.use_async_masking, blocked_instrumentation_scopes=instance.blocked_instrumentation_scopes, should_export_span=instance.should_export_span, additional_headers=instance.additional_headers, diff --git a/langfuse/_client/masking_exporter.py b/langfuse/_client/masking_exporter.py new file mode 100644 index 000000000..ffdb46d24 --- /dev/null +++ b/langfuse/_client/masking_exporter.py @@ -0,0 +1,312 @@ +"""Masking span exporter for batch masking prior to OTLP export. + +When batch_mask is configured, this exporter wraps the real OTLPSpanExporter. +It collects maskable span attributes (input/output/metadata), calls the +batch mask function, then wraps each span with MaskedAttributeSpanWrapper +so the OTLP exporter serializes masked data. + +When use_async_masking is True, batch masking and export run in a background +thread so the main thread is not blocked. +""" + +import threading +from queue import Empty, Full, Queue +from typing import Any, Dict, List, Optional, Sequence, Tuple + +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult + +from langfuse._client.attributes import LangfuseOtelSpanAttributes +from langfuse.logger import langfuse_logger +from langfuse.types import BatchMaskFunction + +_SHUTDOWN_SENTINEL: Any = object() +_FLUSH_SENTINEL: Any = object() + + +class MaskedAttributeSpanWrapper: + """Wraps a ReadableSpan and overlays masked attribute values. + + When using batch masking, the exporter wraps each ReadableSpan in this + class so that attribute reads (e.g. by the OTLP exporter) see masked + input/output/metadata instead of the original values. + + Delegates all span properties and methods to the underlying span except + for attributes: both .attributes and ._attributes return a merge of the + original span attributes and the provided masked_attributes (masked + values take precedence). + """ + + def __init__(self, span: ReadableSpan, masked_attributes: Dict[str, Any]) -> None: + self._span = span + self._masked_attributes = masked_attributes + base = dict(span.attributes) if span.attributes else {} + self._merged_attributes = {**base, **masked_attributes} + + @property + def attributes(self) -> Dict[str, Any]: + """Return span attributes with masked values overlaid.""" + return self._merged_attributes + + @property + def _attributes(self) -> Dict[str, Any]: + """Return span attributes with masked values overlaid (private API).""" + return self._merged_attributes + + def __getattr__(self, name: str) -> Any: + """Delegate all other attribute access to the underlying span.""" + return getattr(self._span, name) + +# Attribute keys that may contain PII and are passed to batch_mask. +_MASKABLE_ATTR_KEYS = ( + LangfuseOtelSpanAttributes.TRACE_INPUT, + LangfuseOtelSpanAttributes.TRACE_OUTPUT, + LangfuseOtelSpanAttributes.OBSERVATION_INPUT, + LangfuseOtelSpanAttributes.OBSERVATION_OUTPUT, + LangfuseOtelSpanAttributes.OBSERVATION_METADATA, +) + + +def _collect_maskable_items( + spans: Sequence[ReadableSpan], +) -> Tuple[List[Any], List[Tuple[int, str]]]: + """Collect maskable attribute values from spans in deterministic order. + + Returns: + (items, backref): items is the list to pass to batch_mask; + backref[i] is (span_index, attr_key) for items[i]. + """ + items: List[Any] = [] + backref: List[Tuple[int, str]] = [] + for span_idx, span in enumerate(spans): + attrs = span.attributes or {} + for key in _MASKABLE_ATTR_KEYS: + if key in attrs: + items.append(attrs[key]) + backref.append((span_idx, key)) + return items, backref + + +def _apply_batch_mask( + batch_mask: BatchMaskFunction, + items: List[Any], + backref: List[Tuple[int, str]], + mask_batch_size: Optional[int], +) -> List[Tuple[int, str, Any]]: + """Call batch_mask (in chunks if mask_batch_size set), return (span_idx, key, masked_value).""" + if not items: + return [] + if mask_batch_size is None or mask_batch_size <= 0: + try: + masked = batch_mask(items) + except Exception as e: + langfuse_logger.error( + "Batch masking failed; exporting with fallback masking. Error: %s", e + ) + masked = [""] * len(items) + if len(masked) != len(items): + langfuse_logger.error( + "Batch mask returned length %s, expected %s; using fallback.", + len(masked), + len(items), + ) + masked = [""] * len(items) + else: + masked: List[Any] = [] + for i in range(0, len(items), mask_batch_size): + chunk = items[i : i + mask_batch_size] + chunk_backref = backref[i : i + mask_batch_size] + try: + chunk_masked = batch_mask(chunk) + except Exception as e: + langfuse_logger.error( + "Batch masking chunk failed; using fallback. Error: %s", e + ) + chunk_masked = [""] * len(chunk) + if len(chunk_masked) != len(chunk): + langfuse_logger.error( + "Batch mask returned length %s, expected %s; using fallback.", + len(chunk_masked), + len(chunk), + ) + chunk_masked = [""] * len(chunk) + masked.extend(chunk_masked) + return [(backref[j][0], backref[j][1], masked[j]) for j in range(len(masked))] + + +def _build_masked_attributes_per_span( + num_spans: int, + masked_triples: List[Tuple[int, str, Any]], +) -> List[Dict[str, Any]]: + """Build a masked_attributes dict for each span.""" + result: List[Dict[str, Any]] = [{} for _ in range(num_spans)] + for span_idx, key, value in masked_triples: + result[span_idx][key] = value + return result + + +def _export_batch_sync( + span_exporter: SpanExporter, + batch_mask: BatchMaskFunction, + mask_batch_size: Optional[int], + spans: Sequence[ReadableSpan], +) -> SpanExportResult: + """Run batch mask and export on the given spans; used by sync and worker.""" + if not spans: + return span_exporter.export(spans) + + items, backref = _collect_maskable_items(spans) + if not items: + return span_exporter.export(spans) + + masked_triples = _apply_batch_mask( + batch_mask, items, backref, mask_batch_size + ) + masked_per_span = _build_masked_attributes_per_span(len(spans), masked_triples) + + wrapped = [ + MaskedAttributeSpanWrapper(span, masked_per_span[i]) + for i, span in enumerate(spans) + ] + return span_exporter.export(wrapped) + + +class MaskingSpanExporter(SpanExporter): + """SpanExporter that runs batch masking before delegating to the real exporter.""" + + _QUEUE_MAX_SIZE = 1000 + _QUEUE_PUT_TIMEOUT_SEC = 5.0 + + def __init__( + self, + *, + span_exporter: SpanExporter, + batch_mask: BatchMaskFunction, + mask_batch_size: Optional[int] = None, + use_async_masking: bool = False, + ) -> None: + self._span_exporter = span_exporter + self._batch_mask = batch_mask + self._mask_batch_size = mask_batch_size + self._use_async_masking = use_async_masking + self._queue: Optional[Queue] = None + self._worker: Optional[threading.Thread] = None + self._flush_event: Optional[threading.Event] = None + self._closed = False + + if use_async_masking: + self._queue = Queue(maxsize=self._QUEUE_MAX_SIZE) + self._flush_event = threading.Event() + self._worker = threading.Thread( + target=self._worker_loop, + name="langfuse-masking-exporter", + daemon=True, + ) + self._worker.start() + + def _worker_loop(self) -> None: + if self._queue is None or self._flush_event is None: + return + while True: + try: + item = self._queue.get(timeout=0.5) + except Empty: + continue + if item is _SHUTDOWN_SENTINEL: + break + if item is _FLUSH_SENTINEL: + while True: + try: + extra = self._queue.get_nowait() + except Empty: + break + if extra is _SHUTDOWN_SENTINEL: + self._queue.put(extra) + self._flush_event.set() + break + if extra is not _FLUSH_SENTINEL: + try: + _export_batch_sync( + self._span_exporter, + self._batch_mask, + self._mask_batch_size, + extra, + ) + except Exception as e: + langfuse_logger.error( + "Async batch masking/export failed: %s", + e, + ) + self._flush_event.set() + continue + try: + _export_batch_sync( + self._span_exporter, + self._batch_mask, + self._mask_batch_size, + item, + ) + except Exception as e: + langfuse_logger.error( + "Async batch masking/export failed: %s", + e, + ) + self._flush_event.set() + + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: + if not spans: + return self._span_exporter.export(spans) + + if not self._use_async_masking: + return _export_batch_sync( + self._span_exporter, + self._batch_mask, + self._mask_batch_size, + spans, + ) + + if self._queue is None or self._closed: + return SpanExportResult.FAILURE + + try: + self._queue.put(list(spans), timeout=self._QUEUE_PUT_TIMEOUT_SEC) + except Full: + langfuse_logger.error( + "Masking exporter queue full; dropping batch of %s spans", + len(spans), + ) + return SpanExportResult.FAILURE + return SpanExportResult.SUCCESS + + def shutdown(self) -> None: + self._closed = True + if self._worker is not None and self._worker.is_alive() and self._queue is not None: + self._queue.put(_SHUTDOWN_SENTINEL) + self._worker.join(timeout=10.0) + if self._worker.is_alive(): + langfuse_logger.warning( + "Masking exporter worker did not stop within timeout" + ) + if hasattr(self._span_exporter, "shutdown"): + self._span_exporter.shutdown() + + def force_flush(self, timeout_millis: Optional[int] = None) -> bool: + if not self._use_async_masking: + if hasattr(self._span_exporter, "force_flush"): + return self._span_exporter.force_flush(timeout_millis) + return True + + if self._queue is None or self._flush_event is None or self._closed: + return True + + self._flush_event.clear() + try: + self._queue.put(_FLUSH_SENTINEL, timeout=self._QUEUE_PUT_TIMEOUT_SEC) + except Full: + return False + timeout_sec = (timeout_millis / 1000.0) if timeout_millis is not None else 10.0 + if not self._flush_event.wait(timeout=timeout_sec): + return False + if hasattr(self._span_exporter, "force_flush"): + return self._span_exporter.force_flush(timeout_millis) + return True diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index 45c90ad66..2fe8d8d48 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -44,7 +44,7 @@ from langfuse._utils.request import LangfuseClient from langfuse.api import AsyncLangfuseAPI, LangfuseAPI from langfuse.logger import langfuse_logger -from langfuse.types import MaskFunction +from langfuse.types import BatchMaskFunction, MaskFunction from ..version import __version__ as langfuse_version @@ -93,6 +93,9 @@ def __new__( media_upload_thread_count: Optional[int] = None, sample_rate: Optional[float] = None, mask: Optional[MaskFunction] = None, + batch_mask: Optional[BatchMaskFunction] = None, + mask_batch_size: Optional[int] = None, + use_async_masking: bool = False, tracing_enabled: Optional[bool] = None, blocked_instrumentation_scopes: Optional[List[str]] = None, should_export_span: Optional[Callable[[ReadableSpan], bool]] = None, @@ -126,6 +129,9 @@ def __new__( media_upload_thread_count=media_upload_thread_count, sample_rate=sample_rate, mask=mask, + batch_mask=batch_mask, + mask_batch_size=mask_batch_size, + use_async_masking=use_async_masking, tracing_enabled=tracing_enabled if tracing_enabled is not None else True, @@ -154,6 +160,9 @@ def _initialize_instance( httpx_client: Optional[httpx.Client] = None, sample_rate: Optional[float] = None, mask: Optional[MaskFunction] = None, + batch_mask: Optional[BatchMaskFunction] = None, + mask_batch_size: Optional[int] = None, + use_async_masking: bool = False, tracing_enabled: bool = True, blocked_instrumentation_scopes: Optional[List[str]] = None, should_export_span: Optional[Callable[[ReadableSpan], bool]] = None, @@ -165,6 +174,9 @@ def _initialize_instance( self.tracing_enabled = tracing_enabled self.base_url = base_url self.mask = mask + self.batch_mask = batch_mask + self.mask_batch_size = mask_batch_size + self.use_async_masking = use_async_masking self.environment = environment # Store additional client settings for get_client() to use @@ -196,6 +208,9 @@ def _initialize_instance( blocked_instrumentation_scopes=blocked_instrumentation_scopes, should_export_span=should_export_span, additional_headers=additional_headers, + batch_mask=self.batch_mask, + mask_batch_size=self.mask_batch_size, + use_async_masking=self.use_async_masking, ) tracer_provider.add_span_processor(langfuse_processor) @@ -445,6 +460,11 @@ def shutdown(self) -> None: atexit.unregister(self.shutdown) self.flush() + if self.tracer_provider is not None and not isinstance( + self.tracer_provider, otel_trace_api.ProxyTracerProvider + ): + self.tracer_provider.shutdown() + langfuse_logger.debug("Successfully shut down OTEL tracer provider") self._stop_and_join_consumer_threads() diff --git a/langfuse/_client/span.py b/langfuse/_client/span.py index 2590262ce..2bf7b3c95 100644 --- a/langfuse/_client/span.py +++ b/langfuse/_client/span.py @@ -539,7 +539,8 @@ def _mask_attribute(self, *, data: Any) -> Any: """Apply the configured mask function to data. Internal method that applies the client's configured masking function to - the provided data, with error handling and fallback. + the provided data, with error handling and fallback. When batch_mask is + set, masking is deferred to export time; this method returns data unchanged. Args: data: The data to mask @@ -550,6 +551,9 @@ def _mask_attribute(self, *, data: Any) -> Any: if not self._langfuse_client._mask: return data + if self._langfuse_client._batch_mask is not None: + return data + try: return self._langfuse_client._mask(data=data) except Exception as e: diff --git a/langfuse/_client/span_processor.py b/langfuse/_client/span_processor.py index 3750789c0..50f600c62 100644 --- a/langfuse/_client/span_processor.py +++ b/langfuse/_client/span_processor.py @@ -13,7 +13,7 @@ import base64 import os -from typing import Callable, Dict, List, Optional +from typing import Any, Callable, Dict, List, Optional from opentelemetry import context as context_api from opentelemetry.context import Context @@ -27,10 +27,15 @@ LANGFUSE_FLUSH_INTERVAL, LANGFUSE_OTEL_TRACES_EXPORT_PATH, ) +from langfuse._client.masking_exporter import ( + MaskedAttributeSpanWrapper, + MaskingSpanExporter, +) from langfuse._client.propagation import _get_propagated_attributes_from_context from langfuse._client.span_filter import is_default_export_span, is_langfuse_span from langfuse._client.utils import span_formatter from langfuse.logger import langfuse_logger +from langfuse.types import BatchMaskFunction from langfuse.version import __version__ as langfuse_version @@ -63,6 +68,9 @@ def __init__( blocked_instrumentation_scopes: Optional[List[str]] = None, should_export_span: Optional[Callable[[ReadableSpan], bool]] = None, additional_headers: Optional[Dict[str, str]] = None, + batch_mask: Optional[BatchMaskFunction] = None, + mask_batch_size: Optional[int] = None, + use_async_masking: bool = False, ): self.public_key = public_key self.blocked_instrumentation_scopes = ( @@ -111,6 +119,14 @@ def __init__( timeout=timeout, ) + if batch_mask is not None: + langfuse_span_exporter = MaskingSpanExporter( + span_exporter=langfuse_span_exporter, + batch_mask=batch_mask, + mask_batch_size=mask_batch_size, + use_async_masking=use_async_masking, + ) + super().__init__( span_exporter=langfuse_span_exporter, export_timeout_millis=timeout * 1_000 if timeout else None, diff --git a/langfuse/types.py b/langfuse/types.py index 067088e40..54f38104d 100644 --- a/langfuse/types.py +++ b/langfuse/types.py @@ -20,6 +20,7 @@ def my_evaluator(*, output: str, **kwargs) -> Evaluation: from typing import ( Any, Dict, + List, Literal, Protocol, TypedDict, @@ -51,6 +52,25 @@ class MaskFunction(Protocol): def __call__(self, *, data: Any, **kwargs: Dict[str, Any]) -> Any: ... +class BatchMaskFunction(Protocol): + """A function that masks a batch of data items in one call. + + Used for batch/async masking: the SDK may call this with multiple items + (e.g. span attribute values) so that IO-bound masking backends can + limit network calls. The returned list must have the same length and + order as the input list. + + Args: + items: List of data items to mask (e.g. input/output/metadata values). + + Returns: + List of masked items, same length and order as items. Each element + must be serializable to JSON. + """ + + def __call__(self, items: List[Any], **kwargs: Dict[str, Any]) -> List[Any]: ... + + class ParsedMediaReference(TypedDict): """A parsed media reference. @@ -71,9 +91,10 @@ class TraceContext(TypedDict): __all__ = [ - "SpanLevel", - "ScoreDataType", + "BatchMaskFunction", "MaskFunction", "ParsedMediaReference", + "ScoreDataType", + "SpanLevel", "TraceContext", ] diff --git a/tests/test_otel.py b/tests/test_otel.py index b4b985780..87c393c0b 100644 --- a/tests/test_otel.py +++ b/tests/test_otel.py @@ -3339,6 +3339,165 @@ def mock_process_media(*, data, field): span._process_media_in_attribute = original_process +class TestMaskedAttributeSpanWrapper: + """Tests for MaskedAttributeSpanWrapper (batch masking overlay).""" + + def test_wrapper_overlays_masked_attributes(self): + """Wrapper returns merged attributes with masked values taking precedence.""" + from unittest.mock import MagicMock + + from langfuse._client.span_processor import MaskedAttributeSpanWrapper + + mock_span = MagicMock(spec=ReadableSpan) + mock_span.attributes = { + LangfuseOtelSpanAttributes.OBSERVATION_INPUT: '{"secret": "data"}', + LangfuseOtelSpanAttributes.OBSERVATION_OUTPUT: '{"result": "ok"}', + "other.attribute": "unchanged", + } + + masked = { + LangfuseOtelSpanAttributes.OBSERVATION_INPUT: '{"secret": "***MASKED***"}', + LangfuseOtelSpanAttributes.OBSERVATION_OUTPUT: '{"result": "***MASKED***"}', + } + + wrapped = MaskedAttributeSpanWrapper(mock_span, masked) + + assert wrapped.attributes[LangfuseOtelSpanAttributes.OBSERVATION_INPUT] == ( + '{"secret": "***MASKED***"}' + ) + assert wrapped.attributes[LangfuseOtelSpanAttributes.OBSERVATION_OUTPUT] == ( + '{"result": "***MASKED***"}' + ) + assert wrapped.attributes["other.attribute"] == "unchanged" + assert wrapped._attributes == wrapped.attributes + + def test_wrapper_delegates_other_attributes(self): + """Wrapper delegates name, context, etc. to the underlying span.""" + from unittest.mock import MagicMock + + from langfuse._client.span_processor import MaskedAttributeSpanWrapper + + mock_span = MagicMock(spec=ReadableSpan) + mock_span.attributes = {} + mock_span.name = "my-span" + + wrapped = MaskedAttributeSpanWrapper(mock_span, {}) + assert wrapped.name == "my-span" + + +class TestMaskingSpanExporter: + """Tests for MaskingSpanExporter (batch masking before export).""" + + def test_export_calls_batch_mask_and_wraps_spans(self): + """MaskingSpanExporter collects maskable attrs, calls batch_mask, exports wrapped spans.""" + from unittest.mock import MagicMock + + from langfuse._client.masking_exporter import ( + MaskedAttributeSpanWrapper, + MaskingSpanExporter, + ) + + mock_exporter = MagicMock() + mock_exporter.export.return_value = SpanExportResult.SUCCESS + + def batch_mask(items): + return [f"{str(item)}_masked" for item in items] + + masking_exporter = MaskingSpanExporter( + span_exporter=mock_exporter, + batch_mask=batch_mask, + ) + + span1 = MagicMock(spec=ReadableSpan) + span1.attributes = { + LangfuseOtelSpanAttributes.OBSERVATION_INPUT: '{"raw": "input1"}', + LangfuseOtelSpanAttributes.OBSERVATION_OUTPUT: "output1", + } + span2 = MagicMock(spec=ReadableSpan) + span2.attributes = { + LangfuseOtelSpanAttributes.TRACE_INPUT: "trace_input", + } + + result = masking_exporter.export([span1, span2]) + + assert result == SpanExportResult.SUCCESS + mock_exporter.export.assert_called_once() + (exported,) = mock_exporter.export.call_args[0] + assert len(exported) == 2 + assert all(isinstance(s, MaskedAttributeSpanWrapper) for s in exported) + assert exported[0].attributes[LangfuseOtelSpanAttributes.OBSERVATION_INPUT] == ( + '{"raw": "input1"}_masked' + ) + assert exported[0].attributes[LangfuseOtelSpanAttributes.OBSERVATION_OUTPUT] == ( + "output1_masked" + ) + assert exported[1].attributes[LangfuseOtelSpanAttributes.TRACE_INPUT] == ( + "trace_input_masked" + ) + + def test_export_empty_spans_passthrough(self): + """Export with no spans delegates without calling batch_mask.""" + from unittest.mock import MagicMock + + from langfuse._client.masking_exporter import MaskingSpanExporter + + mock_exporter = MagicMock() + mock_exporter.export.return_value = SpanExportResult.SUCCESS + batch_mask = MagicMock() + + masking_exporter = MaskingSpanExporter( + span_exporter=mock_exporter, + batch_mask=batch_mask, + ) + result = masking_exporter.export([]) + + assert result == SpanExportResult.SUCCESS + mock_exporter.export.assert_called_once_with([]) + batch_mask.assert_not_called() + + def test_export_async_masking_uses_worker_and_force_flush_waits(self): + """With use_async_masking=True, export returns immediately; force_flush waits for worker.""" + from unittest.mock import MagicMock + + from langfuse._client.masking_exporter import ( + MaskedAttributeSpanWrapper, + MaskingSpanExporter, + ) + + mock_exporter = MagicMock() + mock_exporter.export.return_value = SpanExportResult.SUCCESS + mock_exporter.force_flush.return_value = True + + def batch_mask(items): + return [f"{str(x)}_masked" for x in items] + + exporter = MaskingSpanExporter( + span_exporter=mock_exporter, + batch_mask=batch_mask, + use_async_masking=True, + ) + span = MagicMock(spec=ReadableSpan) + span.attributes = { + LangfuseOtelSpanAttributes.OBSERVATION_INPUT: "secret", + } + + result = exporter.export([span]) + assert result == SpanExportResult.SUCCESS + mock_exporter.export.assert_not_called() + + flushed = exporter.force_flush(timeout_millis=5000) + assert flushed is True + mock_exporter.export.assert_called_once() + (exported,) = mock_exporter.export.call_args[0] + assert len(exported) == 1 + assert isinstance(exported[0], MaskedAttributeSpanWrapper) + assert exported[0].attributes[LangfuseOtelSpanAttributes.OBSERVATION_INPUT] == ( + "secret_masked" + ) + + exporter.shutdown() + + class TestOtelIdGeneration(TestOTelBase): """Tests for trace_id and observation_id generation with and without seeds."""