Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/experimentation/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@
variant key."""

EXPOSURE_HOURLY_BUCKET_MAX_WINDOW = timedelta(hours=72)

EXPOSURES_REFRESH_MIN_INTERVAL = timedelta(minutes=5)
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 5.2.14 on 2026-06-11 10:12

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("experimentation", "0006_experiment_exposures"),
]

operations = [
migrations.AddField(
model_name="experimentexposures",
name="refresh_requested_at",
field=models.DateTimeField(blank=True, null=True),
),
]
21 changes: 17 additions & 4 deletions api/experimentation/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@

from core.models import SoftDeleteExportableModel
from environments.models import Environment
from experimentation.tasks import (
add_environment_key_to_ingestion,
delete_environment_key_from_ingestion,
)
from experimentation.types import MetricDefinition

if typing.TYPE_CHECKING:
Expand Down Expand Up @@ -73,12 +69,16 @@ class Meta:

@hook(AFTER_CREATE) # type: ignore[misc]
def sync_to_ingestion_on_create(self) -> None:
from experimentation.tasks import add_environment_key_to_ingestion

add_environment_key_to_ingestion.delay(
kwargs={"environment_api_key": self.environment.api_key},
)

@hook(AFTER_DELETE) # type: ignore[misc]
def sync_to_ingestion_on_delete(self) -> None:
from experimentation.tasks import delete_environment_key_from_ingestion

delete_environment_key_from_ingestion.delay(
kwargs={"environment_api_key": self.environment.api_key},
)
Expand Down Expand Up @@ -143,6 +143,15 @@ class ExperimentExposures(models.Model):
models.JSONField(null=True, blank=True)
)
last_error_at = models.DateTimeField(null=True, blank=True)
refresh_requested_at = models.DateTimeField(null=True, blank=True)

@property
def is_final(self) -> bool:
# Recomputing a final row can only lose data: warehouse events expire.
ended_at = self.experiment.ended_at
return (

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also return true when an experiment has been running for more than 90 days and the first buckets data have expired ?

ended_at is not None and self.as_of is not None and self.as_of >= ended_at
)

def record_refresh(self, summary: "ExposuresSummary", as_of: datetime) -> None:
self.payload = asdict(summary)
Expand All @@ -154,6 +163,10 @@ def record_failure(self) -> None:
self.last_error_at = timezone.now()
self.save(update_fields=["last_error_at"])

def record_refresh_request(self) -> None:
self.refresh_requested_at = timezone.now()
self.save(update_fields=["refresh_requested_at"])


class MetricAggregation(models.TextChoices):
COUNT = "count", "Count"
Expand Down
7 changes: 7 additions & 0 deletions api/experimentation/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from experimentation.metric_definitions import validate_metric_definition
from experimentation.models import (
Experiment,
ExperimentExposures,
ExperimentMetric,
ExperimentStatus,
Metric,
Expand Down Expand Up @@ -245,3 +246,9 @@ class Meta:

class ExperimentListSerializer(ExperimentSerializer):
feature = ExperimentFeatureSerializer(read_only=True)


class ExperimentExposuresSerializer(serializers.ModelSerializer): # type: ignore[type-arg]
class Meta:
model = ExperimentExposures
fields = ("as_of", "last_error_at", "refresh_requested_at", "payload")
32 changes: 32 additions & 0 deletions api/experimentation/tasks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from django.utils import timezone
from task_processor.decorators import register_task_handler

from experimentation import ingestion_sync_service
from experimentation.models import Experiment, ExperimentExposures
from experimentation.services import compute_exposures_summary


@register_task_handler()
Expand All @@ -11,3 +14,32 @@ def add_environment_key_to_ingestion(environment_api_key: str) -> None:
@register_task_handler()
def delete_environment_key_from_ingestion(environment_api_key: str) -> None:
ingestion_sync_service.delete_environment_key(environment_api_key)


@register_task_handler()
def compute_experiment_exposures(experiment_id: int) -> None:
experiment = (
Experiment.objects.select_related("environment", "feature")
.filter(id=experiment_id)
.first()
)
if experiment is None or not experiment.started_at:
return

exposures, _ = ExperimentExposures.objects.get_or_create(experiment=experiment)
if exposures.is_final:
return

as_of = experiment.ended_at or timezone.now()
try:
summary = compute_exposures_summary(
environment_key=experiment.environment.api_key,
feature_name=experiment.feature.name,
window_start=experiment.started_at,
window_end=as_of,
)
except Exception:
exposures.record_failure()
return
Comment on lines +41 to +43

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we log an error here or within record_failure ?


exposures.record_refresh(summary, as_of)
52 changes: 52 additions & 0 deletions api/experimentation/views.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import logging
import math
from typing import Any

from django.db import IntegrityError
from django.db.models import Count, Prefetch, Q, QuerySet
from django.shortcuts import get_object_or_404
from django.utils import timezone
from rest_framework import mixins, serializers, status
from rest_framework.decorators import action
from rest_framework.permissions import IsAuthenticated
Expand All @@ -14,8 +16,10 @@

from app.pagination import CustomPagination
from environments.views import NestedEnvironmentViewSet
from experimentation.constants import EXPOSURES_REFRESH_MIN_INTERVAL
from experimentation.models import (
Experiment,
ExperimentExposures,
ExperimentMetric,
ExperimentStatus,
Metric,
Expand All @@ -28,6 +32,7 @@
WarehouseConnectionPermission,
)
from experimentation.serializers import (
ExperimentExposuresSerializer,
ExperimentListSerializer,
ExperimentMetricSerializer,
ExperimentSerializer,
Expand All @@ -43,6 +48,7 @@
refresh_warehouse_connection_status,
transition_experiment_status,
)
from experimentation.tasks import compute_experiment_exposures
from users.models import FFAdminUser

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -268,6 +274,52 @@ def pause(self, request: Request, **kwargs: object) -> Response:
def complete(self, request: Request, **kwargs: object) -> Response:
return self._transition_status(ExperimentStatus.COMPLETED)

@action(detail=True, methods=["get"])
def exposures(self, request: Request, **kwargs: object) -> Response:
experiment: Experiment = self.get_object()
exposures = getattr(experiment, "exposures", None)
return Response(
{
"exposures": (
ExperimentExposuresSerializer(exposures).data if exposures else None
),
}
)

@action(detail=True, methods=["post"], url_path="exposures/refresh")
def refresh_exposures(self, request: Request, **kwargs: object) -> Response:
experiment: Experiment = self.get_object()
if experiment.started_at is None:
return Response(
{"detail": "Cannot refresh exposures before the experiment starts."},
status=status.HTTP_400_BAD_REQUEST,
)
exposures = ExperimentExposures.objects.filter(experiment=experiment).first()
if exposures is not None and exposures.is_final:
return Response(
{"detail": "Exposures are final for this completed experiment."},
status=status.HTTP_400_BAD_REQUEST,
)
if exposures is not None and exposures.refresh_requested_at is not None:
retry_after = EXPOSURES_REFRESH_MIN_INTERVAL - (
timezone.now() - exposures.refresh_requested_at
)
if retry_after.total_seconds() > 0:
return Response(
{"detail": "A refresh was requested recently. Try again later."},
status=status.HTTP_429_TOO_MANY_REQUESTS,
headers={
"Retry-After": str(math.ceil(retry_after.total_seconds()))
},
)
if exposures is None:
exposures, _ = ExperimentExposures.objects.get_or_create(
experiment=experiment
)
exposures.record_refresh_request()
compute_experiment_exposures.delay(kwargs={"experiment_id": experiment.id})
Comment thread
gagantrivedi marked this conversation as resolved.
return Response(status=status.HTTP_202_ACCEPTED)

def _transition_status(self, target_status: str) -> Response:
experiment: Experiment = self.get_object()
try:
Expand Down
Loading
Loading