diff --git a/api/experimentation/constants.py b/api/experimentation/constants.py index 07b0eb643718..9dab5688cd0e 100644 --- a/api/experimentation/constants.py +++ b/api/experimentation/constants.py @@ -8,3 +8,5 @@ variant key.""" EXPOSURE_HOURLY_BUCKET_MAX_WINDOW = timedelta(hours=72) + +EXPOSURES_REFRESH_MIN_INTERVAL = timedelta(minutes=5) diff --git a/api/experimentation/migrations/0007_exposures_refresh_requested_at.py b/api/experimentation/migrations/0007_exposures_refresh_requested_at.py new file mode 100644 index 000000000000..8bfde16def76 --- /dev/null +++ b/api/experimentation/migrations/0007_exposures_refresh_requested_at.py @@ -0,0 +1,18 @@ +# Generated by Django 5.2.14 on 2026-06-11 10:12 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("experimentation", "0006_experiment_exposures"), + ] + + operations = [ + migrations.AddField( + model_name="experimentexposures", + name="refresh_requested_at", + field=models.DateTimeField(blank=True, null=True), + ), + ] diff --git a/api/experimentation/models.py b/api/experimentation/models.py index d9703a3c3839..5fe779007fe8 100644 --- a/api/experimentation/models.py +++ b/api/experimentation/models.py @@ -14,10 +14,6 @@ from core.models import SoftDeleteExportableModel from environments.models import Environment -from experimentation.tasks import ( - add_environment_key_to_ingestion, - delete_environment_key_from_ingestion, -) from experimentation.types import MetricDefinition if typing.TYPE_CHECKING: @@ -73,12 +69,16 @@ class Meta: @hook(AFTER_CREATE) # type: ignore[misc] def sync_to_ingestion_on_create(self) -> None: + from experimentation.tasks import add_environment_key_to_ingestion + add_environment_key_to_ingestion.delay( kwargs={"environment_api_key": self.environment.api_key}, ) @hook(AFTER_DELETE) # type: ignore[misc] def sync_to_ingestion_on_delete(self) -> None: + from experimentation.tasks import delete_environment_key_from_ingestion + delete_environment_key_from_ingestion.delay( kwargs={"environment_api_key": self.environment.api_key}, ) @@ -143,6 +143,14 @@ class ExperimentExposures(models.Model): models.JSONField(null=True, blank=True) ) last_error_at = models.DateTimeField(null=True, blank=True) + refresh_requested_at = models.DateTimeField(null=True, blank=True) + + @property + def is_final(self) -> bool: + ended_at = self.experiment.ended_at + return ( + ended_at is not None and self.as_of is not None and self.as_of >= ended_at + ) def record_refresh(self, summary: "ExposuresSummary", as_of: datetime) -> None: self.payload = asdict(summary) @@ -154,6 +162,10 @@ def record_failure(self) -> None: self.last_error_at = timezone.now() self.save(update_fields=["last_error_at"]) + def record_refresh_request(self) -> None: + self.refresh_requested_at = timezone.now() + self.save(update_fields=["refresh_requested_at"]) + class MetricAggregation(models.TextChoices): COUNT = "count", "Count" diff --git a/api/experimentation/serializers.py b/api/experimentation/serializers.py index 00fbbb99fbb6..68c7f867ef1c 100644 --- a/api/experimentation/serializers.py +++ b/api/experimentation/serializers.py @@ -7,6 +7,7 @@ from experimentation.metric_definitions import validate_metric_definition from experimentation.models import ( Experiment, + ExperimentExposures, ExperimentMetric, ExperimentStatus, Metric, @@ -245,3 +246,9 @@ class Meta: class ExperimentListSerializer(ExperimentSerializer): feature = ExperimentFeatureSerializer(read_only=True) + + +class ExperimentExposuresSerializer(serializers.ModelSerializer): # type: ignore[type-arg] + class Meta: + model = ExperimentExposures + fields = ("as_of", "last_error_at", "refresh_requested_at", "payload") diff --git a/api/experimentation/tasks.py b/api/experimentation/tasks.py index a4d954001a2c..302ef3053aff 100644 --- a/api/experimentation/tasks.py +++ b/api/experimentation/tasks.py @@ -1,6 +1,12 @@ +import structlog +from django.utils import timezone from task_processor.decorators import register_task_handler from experimentation import ingestion_sync_service +from experimentation.models import Experiment, ExperimentExposures +from experimentation.services import compute_exposures_summary + +logger = structlog.get_logger("experimentation") @register_task_handler() @@ -11,3 +17,39 @@ def add_environment_key_to_ingestion(environment_api_key: str) -> None: @register_task_handler() def delete_environment_key_from_ingestion(environment_api_key: str) -> None: ingestion_sync_service.delete_environment_key(environment_api_key) + + +@register_task_handler() +def compute_experiment_exposures(experiment_id: int) -> None: + experiment = ( + Experiment.objects.select_related("environment__project", "feature") + .filter(id=experiment_id) + .first() + ) + if experiment is None or not experiment.started_at: + return + + exposures, _ = ExperimentExposures.objects.get_or_create(experiment=experiment) + if exposures.is_final: + return + + as_of = experiment.ended_at or timezone.now() + try: + summary = compute_exposures_summary( + environment_key=experiment.environment.api_key, + feature_name=experiment.feature.name, + window_start=experiment.started_at, + window_end=as_of, + ) + except Exception as exc: + exposures.record_failure() + logger.error( + "exposures.compute_failed", + exc_info=exc, + experiment__id=experiment.id, + environment__id=experiment.environment_id, + organisation__id=experiment.environment.project.organisation_id, + ) + return + + exposures.record_refresh(summary, as_of) diff --git a/api/experimentation/views.py b/api/experimentation/views.py index e501197d3031..099037a34568 100644 --- a/api/experimentation/views.py +++ b/api/experimentation/views.py @@ -1,9 +1,11 @@ import logging +import math from typing import Any from django.db import IntegrityError from django.db.models import Count, Prefetch, Q, QuerySet from django.shortcuts import get_object_or_404 +from django.utils import timezone from rest_framework import mixins, serializers, status from rest_framework.decorators import action from rest_framework.permissions import IsAuthenticated @@ -14,8 +16,10 @@ from app.pagination import CustomPagination from environments.views import NestedEnvironmentViewSet +from experimentation.constants import EXPOSURES_REFRESH_MIN_INTERVAL from experimentation.models import ( Experiment, + ExperimentExposures, ExperimentMetric, ExperimentStatus, Metric, @@ -28,6 +32,7 @@ WarehouseConnectionPermission, ) from experimentation.serializers import ( + ExperimentExposuresSerializer, ExperimentListSerializer, ExperimentMetricSerializer, ExperimentSerializer, @@ -43,6 +48,7 @@ refresh_warehouse_connection_status, transition_experiment_status, ) +from experimentation.tasks import compute_experiment_exposures from users.models import FFAdminUser logger = logging.getLogger(__name__) @@ -268,6 +274,52 @@ def pause(self, request: Request, **kwargs: object) -> Response: def complete(self, request: Request, **kwargs: object) -> Response: return self._transition_status(ExperimentStatus.COMPLETED) + @action(detail=True, methods=["get"]) + def exposures(self, request: Request, **kwargs: object) -> Response: + experiment: Experiment = self.get_object() + exposures = getattr(experiment, "exposures", None) + return Response( + { + "exposures": ( + ExperimentExposuresSerializer(exposures).data if exposures else None + ), + } + ) + + @action(detail=True, methods=["post"], url_path="exposures/refresh") + def refresh_exposures(self, request: Request, **kwargs: object) -> Response: + experiment: Experiment = self.get_object() + if experiment.started_at is None: + return Response( + {"detail": "Cannot refresh exposures before the experiment starts."}, + status=status.HTTP_400_BAD_REQUEST, + ) + exposures = ExperimentExposures.objects.filter(experiment=experiment).first() + if exposures is not None and exposures.is_final: + return Response( + {"detail": "Exposures are final for this completed experiment."}, + status=status.HTTP_400_BAD_REQUEST, + ) + if exposures is not None and exposures.refresh_requested_at is not None: + retry_after = EXPOSURES_REFRESH_MIN_INTERVAL - ( + timezone.now() - exposures.refresh_requested_at + ) + if retry_after.total_seconds() > 0: + return Response( + {"detail": "A refresh was requested recently. Try again later."}, + status=status.HTTP_429_TOO_MANY_REQUESTS, + headers={ + "Retry-After": str(math.ceil(retry_after.total_seconds())) + }, + ) + if exposures is None: + exposures, _ = ExperimentExposures.objects.get_or_create( + experiment=experiment + ) + exposures.record_refresh_request() + compute_experiment_exposures.delay(kwargs={"experiment_id": experiment.id}) + return Response(status=status.HTTP_202_ACCEPTED) + def _transition_status(self, target_status: str) -> Response: experiment: Experiment = self.get_object() try: diff --git a/api/tests/unit/experimentation/test_experiment_views.py b/api/tests/unit/experimentation/test_experiment_views.py index 6910544876fd..95a1d3f1eae0 100644 --- a/api/tests/unit/experimentation/test_experiment_views.py +++ b/api/tests/unit/experimentation/test_experiment_views.py @@ -1,10 +1,14 @@ from __future__ import annotations +from datetime import datetime, timedelta +from datetime import timezone as dt_timezone from typing import TYPE_CHECKING import pytest from django.db import IntegrityError from django.urls import reverse +from django.utils import timezone +from freezegun import freeze_time from pytest_mock import MockerFixture from rest_framework import status from rest_framework.test import APIClient @@ -12,8 +16,15 @@ from audit.models import AuditLog from audit.related_object_type import RelatedObjectType from environments.models import Environment -from experimentation.constants import EXPERIMENT_FLAG -from experimentation.models import Experiment, ExperimentStatus +from experimentation.constants import ( + EXPERIMENT_FLAG, + EXPOSURES_REFRESH_MIN_INTERVAL, +) +from experimentation.models import ( + Experiment, + ExperimentExposures, + ExperimentStatus, +) from features.feature_types import MULTIVARIATE from features.models import Feature from tests.types import EnableFeaturesFixture @@ -649,6 +660,314 @@ def test_action__complete__sets_ended_at( assert response.json()["ended_at"] is not None +def test_exposures__computed_row__returns_row( + admin_client_new: APIClient, + environment: Environment, + experiment: Experiment, + enable_features: EnableFeaturesFixture, +) -> None: + # Given a previously computed exposures row + enable_features(EXPERIMENT_FLAG) + payload = { + "excluded_identities": 4, + "timeseries": { + "granularity": "day", + "points": [ + { + "bucket": "2026-06-01T00:00:00+00:00", + "new_identities": {"control": 310, "variant_a": 295}, + } + ], + }, + } + ExperimentExposures.objects.create( + experiment=experiment, + as_of=datetime(2026, 6, 11, 12, tzinfo=dt_timezone.utc), + payload=payload, + ) + + # When + response = admin_client_new.get(_action_url(environment, experiment, "exposures")) + + # Then + assert response.status_code == status.HTTP_200_OK + assert response.json() == { + "exposures": { + "as_of": "2026-06-11T12:00:00Z", + "last_error_at": None, + "refresh_requested_at": None, + "payload": payload, + } + } + + +def test_exposures__never_computed__returns_null( + admin_client_new: APIClient, + environment: Environment, + experiment: Experiment, + enable_features: EnableFeaturesFixture, +) -> None: + # Given + enable_features(EXPERIMENT_FLAG) + + # When + response = admin_client_new.get(_action_url(environment, experiment, "exposures")) + + # Then + assert response.status_code == status.HTTP_200_OK + assert response.json() == {"exposures": None} + + +def test_exposures__failed_refresh__returns_error_marker_with_last_payload( + admin_client_new: APIClient, + environment: Environment, + experiment: Experiment, + enable_features: EnableFeaturesFixture, +) -> None: + # Given a row whose last refresh failed after an earlier success + enable_features(EXPERIMENT_FLAG) + payload = { + "excluded_identities": 0, + "timeseries": {"granularity": "hour", "points": []}, + } + ExperimentExposures.objects.create( + experiment=experiment, + as_of=datetime(2026, 6, 11, 11, tzinfo=dt_timezone.utc), + payload=payload, + last_error_at=datetime(2026, 6, 11, 12, tzinfo=dt_timezone.utc), + ) + + # When + response = admin_client_new.get(_action_url(environment, experiment, "exposures")) + + # Then the stale data and the error marker are both surfaced + assert response.status_code == status.HTTP_200_OK + assert response.json() == { + "exposures": { + "as_of": "2026-06-11T11:00:00Z", + "last_error_at": "2026-06-11T12:00:00Z", + "refresh_requested_at": None, + "payload": payload, + } + } + + +def test_exposures__admin_without_flag__returns_403( + admin_client_new: APIClient, + environment: Environment, + experiment: Experiment, +) -> None: + # Given — feature flag not enabled + + # When + response = admin_client_new.get(_action_url(environment, experiment, "exposures")) + + # Then + assert response.status_code == status.HTTP_403_FORBIDDEN + + +def test_exposures__staff_user_with_flag__returns_403( + staff_client: APIClient, + environment: Environment, + experiment: Experiment, + enable_features: EnableFeaturesFixture, +) -> None: + # Given + enable_features(EXPERIMENT_FLAG) + + # When + response = staff_client.get(_action_url(environment, experiment, "exposures")) + + # Then + assert response.status_code == status.HTTP_403_FORBIDDEN + + +def test_refresh_exposures__started_experiment__enqueues_compute( + admin_client_new: APIClient, + environment: Environment, + experiment: Experiment, + enable_features: EnableFeaturesFixture, + mocker: MockerFixture, +) -> None: + # Given + enable_features(EXPERIMENT_FLAG) + experiment.status = ExperimentStatus.RUNNING + experiment.started_at = datetime(2026, 6, 10, tzinfo=dt_timezone.utc) + experiment.save() + mock_compute = mocker.patch("experimentation.views.compute_experiment_exposures") + + # When + response = admin_client_new.post( + _action_url(environment, experiment, "refresh-exposures") + ) + + # Then + assert response.status_code == status.HTTP_202_ACCEPTED + mock_compute.delay.assert_called_once_with( + kwargs={"experiment_id": experiment.id}, + ) + exposures = ExperimentExposures.objects.get(experiment=experiment) + assert exposures.refresh_requested_at is not None + + +@freeze_time("2026-06-11T12:00:00Z") +def test_refresh_exposures__requested_recently__returns_429_with_retry_after( + admin_client_new: APIClient, + environment: Environment, + experiment: Experiment, + enable_features: EnableFeaturesFixture, + mocker: MockerFixture, +) -> None: + # Given a refresh was requested a minute ago + enable_features(EXPERIMENT_FLAG) + experiment.status = ExperimentStatus.RUNNING + experiment.started_at = datetime(2026, 6, 10, tzinfo=dt_timezone.utc) + experiment.save() + ExperimentExposures.objects.create( + experiment=experiment, + refresh_requested_at=timezone.now() - timedelta(minutes=1), + ) + mock_compute = mocker.patch("experimentation.views.compute_experiment_exposures") + + # When + response = admin_client_new.post( + _action_url(environment, experiment, "refresh-exposures") + ) + + # Then the client is told when to retry + assert response.status_code == status.HTTP_429_TOO_MANY_REQUESTS + assert response.headers["Retry-After"] == "240" + mock_compute.delay.assert_not_called() + + +def test_refresh_exposures__last_request_beyond_interval__enqueues_compute( + admin_client_new: APIClient, + environment: Environment, + experiment: Experiment, + enable_features: EnableFeaturesFixture, + mocker: MockerFixture, +) -> None: + # Given the last refresh request is older than the minimum interval + enable_features(EXPERIMENT_FLAG) + experiment.status = ExperimentStatus.RUNNING + experiment.started_at = datetime(2026, 6, 10, tzinfo=dt_timezone.utc) + experiment.save() + ExperimentExposures.objects.create( + experiment=experiment, + refresh_requested_at=timezone.now() - EXPOSURES_REFRESH_MIN_INTERVAL, + ) + mock_compute = mocker.patch("experimentation.views.compute_experiment_exposures") + + # When + response = admin_client_new.post( + _action_url(environment, experiment, "refresh-exposures") + ) + + # Then + assert response.status_code == status.HTTP_202_ACCEPTED + mock_compute.delay.assert_called_once() + + +def test_refresh_exposures__completed_with_final_row__returns_400( + admin_client_new: APIClient, + environment: Environment, + experiment: Experiment, + enable_features: EnableFeaturesFixture, + mocker: MockerFixture, +) -> None: + # Given a completed experiment whose row already covers the full window + enable_features(EXPERIMENT_FLAG) + experiment.status = ExperimentStatus.COMPLETED + experiment.started_at = datetime(2026, 6, 1, tzinfo=dt_timezone.utc) + experiment.ended_at = datetime(2026, 6, 8, tzinfo=dt_timezone.utc) + experiment.save() + ExperimentExposures.objects.create( + experiment=experiment, + as_of=experiment.ended_at, + payload={"excluded_identities": 0}, + ) + mock_compute = mocker.patch("experimentation.views.compute_experiment_exposures") + + # When + response = admin_client_new.post( + _action_url(environment, experiment, "refresh-exposures") + ) + + # Then the final data cannot be recomputed away (events expire in the + # warehouse after 90 days) + assert response.status_code == status.HTTP_400_BAD_REQUEST + mock_compute.delay.assert_not_called() + + +def test_refresh_exposures__completed_with_stale_row__enqueues_compute( + admin_client_new: APIClient, + environment: Environment, + experiment: Experiment, + enable_features: EnableFeaturesFixture, + mocker: MockerFixture, +) -> None: + # Given a completed experiment last computed before it ended + enable_features(EXPERIMENT_FLAG) + experiment.status = ExperimentStatus.COMPLETED + experiment.started_at = datetime(2026, 6, 1, tzinfo=dt_timezone.utc) + experiment.ended_at = datetime(2026, 6, 8, tzinfo=dt_timezone.utc) + experiment.save() + ExperimentExposures.objects.create( + experiment=experiment, + as_of=datetime(2026, 6, 7, tzinfo=dt_timezone.utc), + payload={"excluded_identities": 0}, + ) + mock_compute = mocker.patch("experimentation.views.compute_experiment_exposures") + + # When + response = admin_client_new.post( + _action_url(environment, experiment, "refresh-exposures") + ) + + # Then the finalising refresh is allowed + assert response.status_code == status.HTTP_202_ACCEPTED + mock_compute.delay.assert_called_once() + + +def test_refresh_exposures__not_started_experiment__returns_400( + admin_client_new: APIClient, + environment: Environment, + experiment: Experiment, + enable_features: EnableFeaturesFixture, + mocker: MockerFixture, +) -> None: + # Given a created experiment that has never started + enable_features(EXPERIMENT_FLAG) + mock_compute = mocker.patch("experimentation.views.compute_experiment_exposures") + + # When + response = admin_client_new.post( + _action_url(environment, experiment, "refresh-exposures") + ) + + # Then + assert response.status_code == status.HTTP_400_BAD_REQUEST + mock_compute.delay.assert_not_called() + + +def test_refresh_exposures__staff_user_with_flag__returns_403( + staff_client: APIClient, + environment: Environment, + experiment: Experiment, + enable_features: EnableFeaturesFixture, +) -> None: + # Given + enable_features(EXPERIMENT_FLAG) + + # When + response = staff_client.post( + _action_url(environment, experiment, "refresh-exposures") + ) + + # Then + assert response.status_code == status.HTTP_403_FORBIDDEN + + def test_delete__exists__returns_204_and_soft_deletes( admin_client_new: APIClient, environment: Environment, diff --git a/api/tests/unit/experimentation/test_models.py b/api/tests/unit/experimentation/test_models.py index 431e863528aa..fd3e6dab1a6e 100644 --- a/api/tests/unit/experimentation/test_models.py +++ b/api/tests/unit/experimentation/test_models.py @@ -23,7 +23,7 @@ def test_warehouse_connection__after_create__enqueues_ingestion_add_task( ) -> None: # Given mock_task = mocker.patch( - "experimentation.models.add_environment_key_to_ingestion", + "experimentation.tasks.add_environment_key_to_ingestion", ) # When @@ -45,7 +45,7 @@ def test_warehouse_connection__after_delete__enqueues_ingestion_delete_task( ) -> None: # Given mock_task = mocker.patch( - "experimentation.models.delete_environment_key_from_ingestion", + "experimentation.tasks.delete_environment_key_from_ingestion", ) environment_api_key = warehouse_connection.environment.api_key diff --git a/api/tests/unit/experimentation/test_tasks.py b/api/tests/unit/experimentation/test_tasks.py index 25e108a39f6d..a465c24bb4b1 100644 --- a/api/tests/unit/experimentation/test_tasks.py +++ b/api/tests/unit/experimentation/test_tasks.py @@ -1,7 +1,25 @@ +from dataclasses import asdict +from datetime import datetime +from datetime import timezone as dt_timezone + +from django.utils import timezone +from freezegun import freeze_time from pytest_mock import MockerFixture +from pytest_structlog import StructuredLogCapture +from experimentation.dataclasses import ( + ExposuresSummary, + ExposuresTimeseries, + ExposuresTimeseriesPoint, +) +from experimentation.models import ( + Experiment, + ExperimentExposures, + ExperimentStatus, +) from experimentation.tasks import ( add_environment_key_to_ingestion, + compute_experiment_exposures, delete_environment_key_from_ingestion, ) @@ -34,3 +52,176 @@ def test_delete_environment_key_from_ingestion__valid_key__calls_service( # Then mock_delete.assert_called_once_with("test-env-key-001") + + +def _summary() -> ExposuresSummary: + return ExposuresSummary( + excluded_identities=1, + timeseries=ExposuresTimeseries( + granularity="hour", + points=[ + ExposuresTimeseriesPoint( + bucket="2026-06-01T00:00:00+00:00", + new_identities={"control": 6, "variant_a": 4}, + ) + ], + ), + ) + + +@freeze_time("2026-06-11T12:00:00Z") +def test_compute_experiment_exposures__running_experiment__stores_summary( + experiment: Experiment, + mocker: MockerFixture, +) -> None: + # Given a running experiment and a warehouse responding with a summary + experiment.status = ExperimentStatus.RUNNING + experiment.started_at = datetime(2026, 6, 10, tzinfo=dt_timezone.utc) + experiment.save() + mock_compute = mocker.patch( + "experimentation.tasks.compute_exposures_summary", + return_value=_summary(), + ) + + # When + compute_experiment_exposures(experiment_id=experiment.id) + + # Then the full window up to now is computed and stored on the row + mock_compute.assert_called_once_with( + environment_key=experiment.environment.api_key, + feature_name=experiment.feature.name, + window_start=experiment.started_at, + window_end=timezone.now(), + ) + exposures = ExperimentExposures.objects.get(experiment=experiment) + assert exposures.payload == asdict(_summary()) + assert exposures.as_of == timezone.now() + assert exposures.last_error_at is None + + +def test_compute_experiment_exposures__completed_experiment__window_ends_at_ended_at( + experiment: Experiment, + mocker: MockerFixture, +) -> None: + # Given a completed experiment + experiment.status = ExperimentStatus.COMPLETED + experiment.started_at = datetime(2026, 6, 1, tzinfo=dt_timezone.utc) + experiment.ended_at = datetime(2026, 6, 8, tzinfo=dt_timezone.utc) + experiment.save() + mock_compute = mocker.patch( + "experimentation.tasks.compute_exposures_summary", + return_value=_summary(), + ) + + # When + compute_experiment_exposures(experiment_id=experiment.id) + + # Then the window is frozen at the experiment's end + mock_compute.assert_called_once_with( + environment_key=experiment.environment.api_key, + feature_name=experiment.feature.name, + window_start=experiment.started_at, + window_end=experiment.ended_at, + ) + exposures = ExperimentExposures.objects.get(experiment=experiment) + assert exposures.as_of == experiment.ended_at + + +def test_compute_experiment_exposures__warehouse_error__records_failure( + experiment: Experiment, + mocker: MockerFixture, + log: StructuredLogCapture, +) -> None: + # Given a running experiment whose row holds a previously computed payload + experiment.status = ExperimentStatus.RUNNING + experiment.started_at = datetime(2026, 6, 10, tzinfo=dt_timezone.utc) + experiment.save() + as_of = timezone.now() + ExperimentExposures.objects.create( + experiment=experiment, + as_of=as_of, + payload=asdict(_summary()), + ) + mocker.patch( + "experimentation.tasks.compute_exposures_summary", + side_effect=Exception("warehouse unreachable"), + ) + + # When + compute_experiment_exposures(experiment_id=experiment.id) + + # Then the failure is recorded and the last good payload survives + exposures = ExperimentExposures.objects.get(experiment=experiment) + assert exposures.last_error_at is not None + assert exposures.payload == asdict(_summary()) + assert exposures.as_of == as_of + # And the failure is logged for operators + assert log.has( + "exposures.compute_failed", + level="error", + experiment__id=experiment.id, + environment__id=experiment.environment_id, + organisation__id=experiment.environment.project.organisation_id, + ) + + +def test_compute_experiment_exposures__not_started_experiment__skips( + experiment: Experiment, + mocker: MockerFixture, +) -> None: + # Given a created experiment that has never started + mock_compute = mocker.patch( + "experimentation.tasks.compute_exposures_summary", + ) + + # When + compute_experiment_exposures(experiment_id=experiment.id) + + # Then nothing is queried or stored + mock_compute.assert_not_called() + assert not ExperimentExposures.objects.filter(experiment=experiment).exists() + + +def test_compute_experiment_exposures__final_row__skips_without_recompute( + experiment: Experiment, + mocker: MockerFixture, +) -> None: + # Given a completed experiment whose row already covers the full window + experiment.status = ExperimentStatus.COMPLETED + experiment.started_at = datetime(2026, 6, 1, tzinfo=dt_timezone.utc) + experiment.ended_at = datetime(2026, 6, 8, tzinfo=dt_timezone.utc) + experiment.save() + ExperimentExposures.objects.create( + experiment=experiment, + as_of=experiment.ended_at, + payload=asdict(_summary()), + ) + mock_compute = mocker.patch( + "experimentation.tasks.compute_exposures_summary", + ) + + # When + compute_experiment_exposures(experiment_id=experiment.id) + + # Then the final payload is left untouched regardless of the caller + mock_compute.assert_not_called() + exposures = ExperimentExposures.objects.get(experiment=experiment) + assert exposures.payload == asdict(_summary()) + + +def test_compute_experiment_exposures__experiment_deleted_after_enqueue__skips( + experiment: Experiment, + mocker: MockerFixture, +) -> None: + # Given the experiment is deleted between enqueue and execution + experiment_id = experiment.id + experiment.delete() + mock_compute = mocker.patch( + "experimentation.tasks.compute_exposures_summary", + ) + + # When + compute_experiment_exposures(experiment_id=experiment_id) + + # Then the task exits without raising into the task processor + mock_compute.assert_not_called() diff --git a/docs/docs/deployment-self-hosting/observability/_events-catalogue.md b/docs/docs/deployment-self-hosting/observability/_events-catalogue.md index c0e1d4d35f5a..24a48df0d14f 100644 --- a/docs/docs/deployment-self-hosting/observability/_events-catalogue.md +++ b/docs/docs/deployment-self-hosting/observability/_events-catalogue.md @@ -79,6 +79,17 @@ Attributes: - `environment_api_key` - `environment_id` +### `experimentation.exposures.compute_failed` + +Logged at `error` from: + - `api/experimentation/tasks.py:46` + +Attributes: + - `environment.id` + - `exc_info` + - `experiment.id` + - `organisation.id` + ### `feature_health.feature_health_event_dismissal_not_supported` Logged at `warning` from: