Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion langfuse/_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@
PromptClient,
TextPromptClient,
)
from langfuse.types import MaskFunction, ScoreDataType, SpanLevel, TraceContext
from langfuse.types import BatchMaskFunction, MaskFunction, ScoreDataType, SpanLevel, TraceContext


class Langfuse:
Expand Down Expand Up @@ -165,6 +165,9 @@ class Langfuse:
media_upload_thread_count (Optional[int]): Number of background threads for handling media uploads. Defaults to 1. Can also be set via LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT environment variable.
sample_rate (Optional[float]): Sampling rate for traces (0.0 to 1.0). Defaults to 1.0 (100% of traces are sampled). Can also be set via LANGFUSE_SAMPLE_RATE environment variable.
mask (Optional[MaskFunction]): Function to mask sensitive data in traces before sending to the API.
batch_mask (Optional[BatchMaskFunction]): Optional batch mask function for batch/async masking. When set, the SDK can mask spans in batches prior to export (e.g. to limit network calls to a masking backend). Use with use_async_masking to run masking off the main thread.
mask_batch_size (Optional[int]): Max number of items to pass per batch to batch_mask. Ignored if batch_mask is not set.
use_async_masking (bool): If True and batch_mask is set, run batch masking in a background thread so the main thread is not blocked. Default False.
blocked_instrumentation_scopes (Optional[List[str]]): Deprecated. Use `should_export_span` instead. Equivalent behavior:
```python
from langfuse.span_filter import is_default_export_span
Expand Down Expand Up @@ -220,6 +223,9 @@ class Langfuse:

_resources: Optional[LangfuseResourceManager] = None
_mask: Optional[MaskFunction] = None
_batch_mask: Optional[BatchMaskFunction] = None
_mask_batch_size: Optional[int] = None
_use_async_masking: bool = False
_otel_tracer: otel_trace_api.Tracer

def __init__(
Expand All @@ -240,6 +246,9 @@ def __init__(
media_upload_thread_count: Optional[int] = None,
sample_rate: Optional[float] = None,
mask: Optional[MaskFunction] = None,
batch_mask: Optional[BatchMaskFunction] = None,
mask_batch_size: Optional[int] = None,
use_async_masking: bool = False,
blocked_instrumentation_scopes: Optional[List[str]] = None,
should_export_span: Optional[Callable[[ReadableSpan], bool]] = None,
additional_headers: Optional[Dict[str, str]] = None,
Expand Down Expand Up @@ -335,13 +344,19 @@ def __init__(
media_upload_thread_count=media_upload_thread_count,
sample_rate=sample_rate,
mask=mask,
batch_mask=batch_mask,
mask_batch_size=mask_batch_size,
use_async_masking=use_async_masking,
tracing_enabled=self._tracing_enabled,
blocked_instrumentation_scopes=blocked_instrumentation_scopes,
should_export_span=should_export_span,
additional_headers=additional_headers,
tracer_provider=tracer_provider,
)
self._mask = self._resources.mask
self._batch_mask = self._resources.batch_mask
self._mask_batch_size = self._resources.mask_batch_size
self._use_async_masking = self._resources.use_async_masking

self._otel_tracer = (
self._resources.tracer
Expand Down
3 changes: 3 additions & 0 deletions langfuse/_client/get_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ def _create_client_from_instance(
media_upload_thread_count=instance.media_upload_thread_count,
sample_rate=instance.sample_rate,
mask=instance.mask,
batch_mask=instance.batch_mask,
mask_batch_size=instance.mask_batch_size,
use_async_masking=instance.use_async_masking,
blocked_instrumentation_scopes=instance.blocked_instrumentation_scopes,
should_export_span=instance.should_export_span,
additional_headers=instance.additional_headers,
Expand Down
312 changes: 312 additions & 0 deletions langfuse/_client/masking_exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,312 @@
"""Masking span exporter for batch masking prior to OTLP export.

When batch_mask is configured, this exporter wraps the real OTLPSpanExporter.
It collects maskable span attributes (input/output/metadata), calls the
batch mask function, then wraps each span with MaskedAttributeSpanWrapper
so the OTLP exporter serializes masked data.

When use_async_masking is True, batch masking and export run in a background
thread so the main thread is not blocked.
"""

import threading
from queue import Empty, Full, Queue
from typing import Any, Dict, List, Optional, Sequence, Tuple

from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult

from langfuse._client.attributes import LangfuseOtelSpanAttributes
from langfuse.logger import langfuse_logger
from langfuse.types import BatchMaskFunction

_SHUTDOWN_SENTINEL: Any = object()
_FLUSH_SENTINEL: Any = object()


class MaskedAttributeSpanWrapper:
"""Wraps a ReadableSpan and overlays masked attribute values.

When using batch masking, the exporter wraps each ReadableSpan in this
class so that attribute reads (e.g. by the OTLP exporter) see masked
input/output/metadata instead of the original values.

Delegates all span properties and methods to the underlying span except
for attributes: both .attributes and ._attributes return a merge of the
original span attributes and the provided masked_attributes (masked
values take precedence).
"""

def __init__(self, span: ReadableSpan, masked_attributes: Dict[str, Any]) -> None:
self._span = span
self._masked_attributes = masked_attributes
base = dict(span.attributes) if span.attributes else {}
self._merged_attributes = {**base, **masked_attributes}

@property
def attributes(self) -> Dict[str, Any]:
"""Return span attributes with masked values overlaid."""
return self._merged_attributes

@property
def _attributes(self) -> Dict[str, Any]:
"""Return span attributes with masked values overlaid (private API)."""
return self._merged_attributes

def __getattr__(self, name: str) -> Any:
"""Delegate all other attribute access to the underlying span."""
return getattr(self._span, name)

# Attribute keys that may contain PII and are passed to batch_mask.
_MASKABLE_ATTR_KEYS = (
LangfuseOtelSpanAttributes.TRACE_INPUT,
LangfuseOtelSpanAttributes.TRACE_OUTPUT,
LangfuseOtelSpanAttributes.OBSERVATION_INPUT,
LangfuseOtelSpanAttributes.OBSERVATION_OUTPUT,
LangfuseOtelSpanAttributes.OBSERVATION_METADATA,
)
Comment on lines +61 to +67
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TRACE_METADATA missing from maskable keys

LangfuseOtelSpanAttributes.TRACE_METADATA ("langfuse.trace.metadata") exists and is set on trace-level spans (confirmed in attributes.py line 173), but it is absent from _MASKABLE_ATTR_KEYS. This means trace metadata will never be passed to batch_mask, leaving potentially sensitive data unmasked even when batch masking is configured.

Suggested change
_MASKABLE_ATTR_KEYS = (
LangfuseOtelSpanAttributes.TRACE_INPUT,
LangfuseOtelSpanAttributes.TRACE_OUTPUT,
LangfuseOtelSpanAttributes.OBSERVATION_INPUT,
LangfuseOtelSpanAttributes.OBSERVATION_OUTPUT,
LangfuseOtelSpanAttributes.OBSERVATION_METADATA,
)
_MASKABLE_ATTR_KEYS = (
LangfuseOtelSpanAttributes.TRACE_INPUT,
LangfuseOtelSpanAttributes.TRACE_OUTPUT,
LangfuseOtelSpanAttributes.TRACE_METADATA,
LangfuseOtelSpanAttributes.OBSERVATION_INPUT,
LangfuseOtelSpanAttributes.OBSERVATION_OUTPUT,
LangfuseOtelSpanAttributes.OBSERVATION_METADATA,
)



def _collect_maskable_items(
spans: Sequence[ReadableSpan],
) -> Tuple[List[Any], List[Tuple[int, str]]]:
"""Collect maskable attribute values from spans in deterministic order.

Returns:
(items, backref): items is the list to pass to batch_mask;
backref[i] is (span_index, attr_key) for items[i].
"""
items: List[Any] = []
backref: List[Tuple[int, str]] = []
for span_idx, span in enumerate(spans):
attrs = span.attributes or {}
for key in _MASKABLE_ATTR_KEYS:
if key in attrs:
items.append(attrs[key])
backref.append((span_idx, key))
return items, backref


def _apply_batch_mask(
batch_mask: BatchMaskFunction,
items: List[Any],
backref: List[Tuple[int, str]],
mask_batch_size: Optional[int],
) -> List[Tuple[int, str, Any]]:
"""Call batch_mask (in chunks if mask_batch_size set), return (span_idx, key, masked_value)."""
if not items:
return []
if mask_batch_size is None or mask_batch_size <= 0:
try:
masked = batch_mask(items)
except Exception as e:
langfuse_logger.error(
"Batch masking failed; exporting with fallback masking. Error: %s", e
)
masked = ["<masking failed>"] * len(items)
if len(masked) != len(items):
langfuse_logger.error(
"Batch mask returned length %s, expected %s; using fallback.",
len(masked),
len(items),
)
masked = ["<masking failed>"] * len(items)
else:
masked: List[Any] = []
for i in range(0, len(items), mask_batch_size):
chunk = items[i : i + mask_batch_size]
chunk_backref = backref[i : i + mask_batch_size]
try:
chunk_masked = batch_mask(chunk)
except Exception as e:
langfuse_logger.error(
"Batch masking chunk failed; using fallback. Error: %s", e
)
chunk_masked = ["<masking failed>"] * len(chunk)
if len(chunk_masked) != len(chunk):
langfuse_logger.error(
"Batch mask returned length %s, expected %s; using fallback.",
len(chunk_masked),
len(chunk),
)
chunk_masked = ["<masking failed>"] * len(chunk)
masked.extend(chunk_masked)
return [(backref[j][0], backref[j][1], masked[j]) for j in range(len(masked))]


def _build_masked_attributes_per_span(
num_spans: int,
masked_triples: List[Tuple[int, str, Any]],
) -> List[Dict[str, Any]]:
"""Build a masked_attributes dict for each span."""
result: List[Dict[str, Any]] = [{} for _ in range(num_spans)]
for span_idx, key, value in masked_triples:
result[span_idx][key] = value
return result


def _export_batch_sync(
span_exporter: SpanExporter,
batch_mask: BatchMaskFunction,
mask_batch_size: Optional[int],
spans: Sequence[ReadableSpan],
) -> SpanExportResult:
"""Run batch mask and export on the given spans; used by sync and worker."""
if not spans:
return span_exporter.export(spans)

items, backref = _collect_maskable_items(spans)
if not items:
return span_exporter.export(spans)

masked_triples = _apply_batch_mask(
batch_mask, items, backref, mask_batch_size
)
masked_per_span = _build_masked_attributes_per_span(len(spans), masked_triples)

wrapped = [
MaskedAttributeSpanWrapper(span, masked_per_span[i])
for i, span in enumerate(spans)
]
return span_exporter.export(wrapped)


class MaskingSpanExporter(SpanExporter):
"""SpanExporter that runs batch masking before delegating to the real exporter."""

_QUEUE_MAX_SIZE = 1000
_QUEUE_PUT_TIMEOUT_SEC = 5.0

def __init__(
self,
*,
span_exporter: SpanExporter,
batch_mask: BatchMaskFunction,
mask_batch_size: Optional[int] = None,
use_async_masking: bool = False,
) -> None:
self._span_exporter = span_exporter
self._batch_mask = batch_mask
self._mask_batch_size = mask_batch_size
self._use_async_masking = use_async_masking
self._queue: Optional[Queue] = None
self._worker: Optional[threading.Thread] = None
self._flush_event: Optional[threading.Event] = None
self._closed = False

if use_async_masking:
self._queue = Queue(maxsize=self._QUEUE_MAX_SIZE)
self._flush_event = threading.Event()
self._worker = threading.Thread(
target=self._worker_loop,
name="langfuse-masking-exporter",
daemon=True,
)
self._worker.start()

def _worker_loop(self) -> None:
if self._queue is None or self._flush_event is None:
return
while True:
try:
item = self._queue.get(timeout=0.5)
except Empty:
continue
if item is _SHUTDOWN_SENTINEL:
break
if item is _FLUSH_SENTINEL:
while True:
try:
extra = self._queue.get_nowait()
except Empty:
break
if extra is _SHUTDOWN_SENTINEL:
self._queue.put(extra)
self._flush_event.set()
break
if extra is not _FLUSH_SENTINEL:
try:
_export_batch_sync(
self._span_exporter,
self._batch_mask,
self._mask_batch_size,
extra,
)
except Exception as e:
langfuse_logger.error(
"Async batch masking/export failed: %s",
e,
)
self._flush_event.set()
continue
try:
_export_batch_sync(
self._span_exporter,
self._batch_mask,
self._mask_batch_size,
item,
)
except Exception as e:
langfuse_logger.error(
"Async batch masking/export failed: %s",
e,
)
self._flush_event.set()

def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
if not spans:
return self._span_exporter.export(spans)

if not self._use_async_masking:
return _export_batch_sync(
self._span_exporter,
self._batch_mask,
self._mask_batch_size,
spans,
)

if self._queue is None or self._closed:
return SpanExportResult.FAILURE

try:
self._queue.put(list(spans), timeout=self._QUEUE_PUT_TIMEOUT_SEC)
except Full:
langfuse_logger.error(
"Masking exporter queue full; dropping batch of %s spans",
len(spans),
)
return SpanExportResult.FAILURE
return SpanExportResult.SUCCESS

def shutdown(self) -> None:
self._closed = True
if self._worker is not None and self._worker.is_alive() and self._queue is not None:
self._queue.put(_SHUTDOWN_SENTINEL)
self._worker.join(timeout=10.0)
if self._worker.is_alive():
langfuse_logger.warning(
"Masking exporter worker did not stop within timeout"
)
if hasattr(self._span_exporter, "shutdown"):
self._span_exporter.shutdown()

def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
if not self._use_async_masking:
if hasattr(self._span_exporter, "force_flush"):
return self._span_exporter.force_flush(timeout_millis)
return True

if self._queue is None or self._flush_event is None or self._closed:
return True

self._flush_event.clear()
try:
self._queue.put(_FLUSH_SENTINEL, timeout=self._QUEUE_PUT_TIMEOUT_SEC)
except Full:
return False
timeout_sec = (timeout_millis / 1000.0) if timeout_millis is not None else 10.0
if not self._flush_event.wait(timeout=timeout_sec):
return False
if hasattr(self._span_exporter, "force_flush"):
return self._span_exporter.force_flush(timeout_millis)
Comment on lines +307 to +311
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

force_flush can wait up to 2x the requested timeout

After waiting up to timeout_sec for the worker's flush event, the code immediately calls self._span_exporter.force_flush(timeout_millis) with the full original timeout. In the worst case, a caller requesting a 5-second flush could wait up to 10 seconds before the call returns. The inner force_flush should receive the remaining time budget.

Suggested change
timeout_sec = (timeout_millis / 1000.0) if timeout_millis is not None else 10.0
if not self._flush_event.wait(timeout=timeout_sec):
return False
if hasattr(self._span_exporter, "force_flush"):
return self._span_exporter.force_flush(timeout_millis)
timeout_sec = (timeout_millis / 1000.0) if timeout_millis is not None else 10.0
start = time.monotonic()
if not self._flush_event.wait(timeout=timeout_sec):
return False
if hasattr(self._span_exporter, "force_flush"):
elapsed_ms = int((time.monotonic() - start) * 1000)
remaining_ms = max(0, timeout_millis - elapsed_ms) if timeout_millis is not None else None
return self._span_exporter.force_flush(remaining_ms)
return True

Note: this also requires adding import time at the top of the module.

Comment on lines +299 to +311
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shared _flush_event causes a race condition under concurrent flushes

_flush_event is a single threading.Event shared by all callers of force_flush. If two threads call force_flush concurrently, the sequence below can cause the first caller to time out even though the flush completed:

  1. Thread A calls _flush_event.clear() and enqueues _FLUSH_SENTINEL.
  2. Worker processes Thread A's sentinel and calls _flush_event.set().
  3. Thread B calls _flush_event.clear() — this clears the event before Thread A calls _flush_event.wait().
  4. Thread A's _flush_event.wait() now blocks until its own timeout expires.

Consider using per-flush threading.Event objects (e.g. by passing the event alongside the sentinel in the queue) so each caller waits only on their own flush completion signal.

return True
Loading