From 7e3ed09a4bbd9abcf9ac53220a4e5f6716252269 Mon Sep 17 00:00:00 2001 From: Gagan Trivedi Date: Thu, 11 Jun 2026 14:08:23 +0530 Subject: [PATCH 1/6] feat(experimentation): experiment exposures read and refresh endpoints --- api/experimentation/models.py | 8 +- api/experimentation/serializers.py | 7 + api/experimentation/tasks.py | 44 ++++ api/experimentation/views.py | 25 +++ .../experimentation/test_experiment_views.py | 193 +++++++++++++++++- api/tests/unit/experimentation/test_models.py | 4 +- api/tests/unit/experimentation/test_tasks.py | 157 ++++++++++++++ .../observability/_events-catalogue.md | 23 +++ 8 files changed, 454 insertions(+), 7 deletions(-) diff --git a/api/experimentation/models.py b/api/experimentation/models.py index d9703a3c3839..a0787fe043e9 100644 --- a/api/experimentation/models.py +++ b/api/experimentation/models.py @@ -14,10 +14,6 @@ from core.models import SoftDeleteExportableModel from environments.models import Environment -from experimentation.tasks import ( - add_environment_key_to_ingestion, - delete_environment_key_from_ingestion, -) from experimentation.types import MetricDefinition if typing.TYPE_CHECKING: @@ -73,12 +69,16 @@ class Meta: @hook(AFTER_CREATE) # type: ignore[misc] def sync_to_ingestion_on_create(self) -> None: + from experimentation.tasks import add_environment_key_to_ingestion + add_environment_key_to_ingestion.delay( kwargs={"environment_api_key": self.environment.api_key}, ) @hook(AFTER_DELETE) # type: ignore[misc] def sync_to_ingestion_on_delete(self) -> None: + from experimentation.tasks import delete_environment_key_from_ingestion + delete_environment_key_from_ingestion.delay( kwargs={"environment_api_key": self.environment.api_key}, ) diff --git a/api/experimentation/serializers.py b/api/experimentation/serializers.py index 00fbbb99fbb6..c690a57510bd 100644 --- a/api/experimentation/serializers.py +++ b/api/experimentation/serializers.py @@ -7,6 +7,7 @@ from experimentation.metric_definitions import validate_metric_definition from experimentation.models import ( Experiment, + ExperimentExposures, ExperimentMetric, ExperimentStatus, Metric, @@ -245,3 +246,9 @@ class Meta: class ExperimentListSerializer(ExperimentSerializer): feature = ExperimentFeatureSerializer(read_only=True) + + +class ExperimentExposuresSerializer(serializers.ModelSerializer): # type: ignore[type-arg] + class Meta: + model = ExperimentExposures + fields = ("as_of", "last_error_at", "payload") diff --git a/api/experimentation/tasks.py b/api/experimentation/tasks.py index a4d954001a2c..c98d6bfb0ce3 100644 --- a/api/experimentation/tasks.py +++ b/api/experimentation/tasks.py @@ -1,6 +1,12 @@ +import structlog +from django.utils import timezone from task_processor.decorators import register_task_handler from experimentation import ingestion_sync_service +from experimentation.models import Experiment, ExperimentExposures +from experimentation.services import compute_exposures_summary + +logger = structlog.get_logger("experimentation") @register_task_handler() @@ -11,3 +17,41 @@ def add_environment_key_to_ingestion(environment_api_key: str) -> None: @register_task_handler() def delete_environment_key_from_ingestion(environment_api_key: str) -> None: ingestion_sync_service.delete_environment_key(environment_api_key) + + +@register_task_handler() +def compute_experiment_exposures(experiment_id: int) -> None: + experiment = Experiment.objects.select_related( + "environment__project", + "feature", + ).get(id=experiment_id) + if not experiment.started_at: + return + + log = logger.bind( + experiment__id=experiment.id, + environment__id=experiment.environment_id, + organisation__id=experiment.environment.project.organisation_id, + ) + exposures, _ = ExperimentExposures.objects.get_or_create(experiment=experiment) + as_of = experiment.ended_at or timezone.now() + try: + summary = compute_exposures_summary( + environment_key=experiment.environment.api_key, + feature_name=experiment.feature.name, + window_start=experiment.started_at, + window_end=as_of, + ) + except Exception as exc: + exposures.record_failure() + log.error("exposures.compute_failed", exc_info=exc) + return + + exposures.record_refresh(summary, as_of) + log.info( + "exposures.computed", + identities__count=sum( + sum(point.new_identities.values()) for point in summary.timeseries.points + ), + excluded_identities__count=summary.excluded_identities, + ) diff --git a/api/experimentation/views.py b/api/experimentation/views.py index e501197d3031..1c5b67b9f8d5 100644 --- a/api/experimentation/views.py +++ b/api/experimentation/views.py @@ -28,6 +28,7 @@ WarehouseConnectionPermission, ) from experimentation.serializers import ( + ExperimentExposuresSerializer, ExperimentListSerializer, ExperimentMetricSerializer, ExperimentSerializer, @@ -43,6 +44,7 @@ refresh_warehouse_connection_status, transition_experiment_status, ) +from experimentation.tasks import compute_experiment_exposures from users.models import FFAdminUser logger = logging.getLogger(__name__) @@ -268,6 +270,29 @@ def pause(self, request: Request, **kwargs: object) -> Response: def complete(self, request: Request, **kwargs: object) -> Response: return self._transition_status(ExperimentStatus.COMPLETED) + @action(detail=True, methods=["get"]) + def exposures(self, request: Request, **kwargs: object) -> Response: + experiment: Experiment = self.get_object() + exposures = getattr(experiment, "exposures", None) + return Response( + { + "exposures": ( + ExperimentExposuresSerializer(exposures).data if exposures else None + ), + } + ) + + @action(detail=True, methods=["post"], url_path="exposures/refresh") + def refresh_exposures(self, request: Request, **kwargs: object) -> Response: + experiment: Experiment = self.get_object() + if experiment.started_at is None: + return Response( + {"detail": "Cannot refresh exposures before the experiment starts."}, + status=status.HTTP_400_BAD_REQUEST, + ) + compute_experiment_exposures.delay(kwargs={"experiment_id": experiment.id}) + return Response(status=status.HTTP_202_ACCEPTED) + def _transition_status(self, target_status: str) -> Response: experiment: Experiment = self.get_object() try: diff --git a/api/tests/unit/experimentation/test_experiment_views.py b/api/tests/unit/experimentation/test_experiment_views.py index 6910544876fd..86a14e4cca37 100644 --- a/api/tests/unit/experimentation/test_experiment_views.py +++ b/api/tests/unit/experimentation/test_experiment_views.py @@ -1,5 +1,7 @@ from __future__ import annotations +from datetime import datetime +from datetime import timezone as dt_timezone from typing import TYPE_CHECKING import pytest @@ -13,7 +15,11 @@ from audit.related_object_type import RelatedObjectType from environments.models import Environment from experimentation.constants import EXPERIMENT_FLAG -from experimentation.models import Experiment, ExperimentStatus +from experimentation.models import ( + Experiment, + ExperimentExposures, + ExperimentStatus, +) from features.feature_types import MULTIVARIATE from features.models import Feature from tests.types import EnableFeaturesFixture @@ -649,6 +655,191 @@ def test_action__complete__sets_ended_at( assert response.json()["ended_at"] is not None +def test_exposures__computed_row__returns_row( + admin_client_new: APIClient, + environment: Environment, + experiment: Experiment, + enable_features: EnableFeaturesFixture, +) -> None: + # Given a previously computed exposures row + enable_features(EXPERIMENT_FLAG) + payload = { + "excluded_identities": 4, + "timeseries": { + "granularity": "day", + "points": [ + { + "bucket": "2026-06-01T00:00:00+00:00", + "new_identities": {"control": 310, "variant_a": 295}, + } + ], + }, + } + ExperimentExposures.objects.create( + experiment=experiment, + as_of=datetime(2026, 6, 11, 12, tzinfo=dt_timezone.utc), + payload=payload, + ) + + # When + response = admin_client_new.get(_action_url(environment, experiment, "exposures")) + + # Then + assert response.status_code == status.HTTP_200_OK + assert response.json() == { + "exposures": { + "as_of": "2026-06-11T12:00:00Z", + "last_error_at": None, + "payload": payload, + } + } + + +def test_exposures__never_computed__returns_null( + admin_client_new: APIClient, + environment: Environment, + experiment: Experiment, + enable_features: EnableFeaturesFixture, +) -> None: + # Given + enable_features(EXPERIMENT_FLAG) + + # When + response = admin_client_new.get(_action_url(environment, experiment, "exposures")) + + # Then + assert response.status_code == status.HTTP_200_OK + assert response.json() == {"exposures": None} + + +def test_exposures__failed_refresh__returns_error_marker_with_last_payload( + admin_client_new: APIClient, + environment: Environment, + experiment: Experiment, + enable_features: EnableFeaturesFixture, +) -> None: + # Given a row whose last refresh failed after an earlier success + enable_features(EXPERIMENT_FLAG) + payload = { + "excluded_identities": 0, + "timeseries": {"granularity": "hour", "points": []}, + } + ExperimentExposures.objects.create( + experiment=experiment, + as_of=datetime(2026, 6, 11, 11, tzinfo=dt_timezone.utc), + payload=payload, + last_error_at=datetime(2026, 6, 11, 12, tzinfo=dt_timezone.utc), + ) + + # When + response = admin_client_new.get(_action_url(environment, experiment, "exposures")) + + # Then the stale data and the error marker are both surfaced + assert response.status_code == status.HTTP_200_OK + assert response.json() == { + "exposures": { + "as_of": "2026-06-11T11:00:00Z", + "last_error_at": "2026-06-11T12:00:00Z", + "payload": payload, + } + } + + +def test_exposures__admin_without_flag__returns_403( + admin_client_new: APIClient, + environment: Environment, + experiment: Experiment, +) -> None: + # Given — feature flag not enabled + + # When + response = admin_client_new.get(_action_url(environment, experiment, "exposures")) + + # Then + assert response.status_code == status.HTTP_403_FORBIDDEN + + +def test_exposures__staff_user_with_flag__returns_403( + staff_client: APIClient, + environment: Environment, + experiment: Experiment, + enable_features: EnableFeaturesFixture, +) -> None: + # Given + enable_features(EXPERIMENT_FLAG) + + # When + response = staff_client.get(_action_url(environment, experiment, "exposures")) + + # Then + assert response.status_code == status.HTTP_403_FORBIDDEN + + +def test_refresh_exposures__started_experiment__enqueues_compute( + admin_client_new: APIClient, + environment: Environment, + experiment: Experiment, + enable_features: EnableFeaturesFixture, + mocker: MockerFixture, +) -> None: + # Given + enable_features(EXPERIMENT_FLAG) + experiment.status = ExperimentStatus.RUNNING + experiment.started_at = datetime(2026, 6, 10, tzinfo=dt_timezone.utc) + experiment.save() + mock_compute = mocker.patch("experimentation.views.compute_experiment_exposures") + + # When + response = admin_client_new.post( + _action_url(environment, experiment, "refresh-exposures") + ) + + # Then + assert response.status_code == status.HTTP_202_ACCEPTED + mock_compute.delay.assert_called_once_with( + kwargs={"experiment_id": experiment.id}, + ) + + +def test_refresh_exposures__not_started_experiment__returns_400( + admin_client_new: APIClient, + environment: Environment, + experiment: Experiment, + enable_features: EnableFeaturesFixture, + mocker: MockerFixture, +) -> None: + # Given a created experiment that has never started + enable_features(EXPERIMENT_FLAG) + mock_compute = mocker.patch("experimentation.views.compute_experiment_exposures") + + # When + response = admin_client_new.post( + _action_url(environment, experiment, "refresh-exposures") + ) + + # Then + assert response.status_code == status.HTTP_400_BAD_REQUEST + mock_compute.delay.assert_not_called() + + +def test_refresh_exposures__staff_user_with_flag__returns_403( + staff_client: APIClient, + environment: Environment, + experiment: Experiment, + enable_features: EnableFeaturesFixture, +) -> None: + # Given + enable_features(EXPERIMENT_FLAG) + + # When + response = staff_client.post( + _action_url(environment, experiment, "refresh-exposures") + ) + + # Then + assert response.status_code == status.HTTP_403_FORBIDDEN + + def test_delete__exists__returns_204_and_soft_deletes( admin_client_new: APIClient, environment: Environment, diff --git a/api/tests/unit/experimentation/test_models.py b/api/tests/unit/experimentation/test_models.py index 431e863528aa..fd3e6dab1a6e 100644 --- a/api/tests/unit/experimentation/test_models.py +++ b/api/tests/unit/experimentation/test_models.py @@ -23,7 +23,7 @@ def test_warehouse_connection__after_create__enqueues_ingestion_add_task( ) -> None: # Given mock_task = mocker.patch( - "experimentation.models.add_environment_key_to_ingestion", + "experimentation.tasks.add_environment_key_to_ingestion", ) # When @@ -45,7 +45,7 @@ def test_warehouse_connection__after_delete__enqueues_ingestion_delete_task( ) -> None: # Given mock_task = mocker.patch( - "experimentation.models.delete_environment_key_from_ingestion", + "experimentation.tasks.delete_environment_key_from_ingestion", ) environment_api_key = warehouse_connection.environment.api_key diff --git a/api/tests/unit/experimentation/test_tasks.py b/api/tests/unit/experimentation/test_tasks.py index 25e108a39f6d..46cd3239528d 100644 --- a/api/tests/unit/experimentation/test_tasks.py +++ b/api/tests/unit/experimentation/test_tasks.py @@ -1,7 +1,25 @@ +from dataclasses import asdict +from datetime import datetime +from datetime import timezone as dt_timezone + +from django.utils import timezone +from freezegun import freeze_time from pytest_mock import MockerFixture +from pytest_structlog import StructuredLogCapture +from experimentation.dataclasses import ( + ExposuresSummary, + ExposuresTimeseries, + ExposuresTimeseriesPoint, +) +from experimentation.models import ( + Experiment, + ExperimentExposures, + ExperimentStatus, +) from experimentation.tasks import ( add_environment_key_to_ingestion, + compute_experiment_exposures, delete_environment_key_from_ingestion, ) @@ -34,3 +52,142 @@ def test_delete_environment_key_from_ingestion__valid_key__calls_service( # Then mock_delete.assert_called_once_with("test-env-key-001") + + +def _summary() -> ExposuresSummary: + return ExposuresSummary( + excluded_identities=1, + timeseries=ExposuresTimeseries( + granularity="hour", + points=[ + ExposuresTimeseriesPoint( + bucket="2026-06-01T00:00:00+00:00", + new_identities={"control": 6, "variant_a": 4}, + ) + ], + ), + ) + + +@freeze_time("2026-06-11T12:00:00Z") +def test_compute_experiment_exposures__running_experiment__stores_summary( + experiment: Experiment, + mocker: MockerFixture, + log: StructuredLogCapture, +) -> None: + # Given a running experiment and a warehouse responding with a summary + experiment.status = ExperimentStatus.RUNNING + experiment.started_at = datetime(2026, 6, 10, tzinfo=dt_timezone.utc) + experiment.save() + mock_compute = mocker.patch( + "experimentation.tasks.compute_exposures_summary", + return_value=_summary(), + ) + + # When + compute_experiment_exposures(experiment_id=experiment.id) + + # Then the full window up to now is computed and stored on the row + mock_compute.assert_called_once_with( + environment_key=experiment.environment.api_key, + feature_name=experiment.feature.name, + window_start=experiment.started_at, + window_end=timezone.now(), + ) + exposures = ExperimentExposures.objects.get(experiment=experiment) + assert exposures.payload == asdict(_summary()) + assert exposures.as_of == timezone.now() + assert exposures.last_error_at is None + assert log.events == [ + { + "level": "info", + "event": "exposures.computed", + "experiment__id": experiment.id, + "environment__id": experiment.environment_id, + "organisation__id": experiment.environment.project.organisation_id, + "identities__count": 10, + "excluded_identities__count": 1, + } + ] + + +def test_compute_experiment_exposures__completed_experiment__window_ends_at_ended_at( + experiment: Experiment, + mocker: MockerFixture, +) -> None: + # Given a completed experiment + experiment.status = ExperimentStatus.COMPLETED + experiment.started_at = datetime(2026, 6, 1, tzinfo=dt_timezone.utc) + experiment.ended_at = datetime(2026, 6, 8, tzinfo=dt_timezone.utc) + experiment.save() + mock_compute = mocker.patch( + "experimentation.tasks.compute_exposures_summary", + return_value=_summary(), + ) + + # When + compute_experiment_exposures(experiment_id=experiment.id) + + # Then the window is frozen at the experiment's end + mock_compute.assert_called_once_with( + environment_key=experiment.environment.api_key, + feature_name=experiment.feature.name, + window_start=experiment.started_at, + window_end=experiment.ended_at, + ) + exposures = ExperimentExposures.objects.get(experiment=experiment) + assert exposures.as_of == experiment.ended_at + + +def test_compute_experiment_exposures__warehouse_error__records_failure( + experiment: Experiment, + mocker: MockerFixture, + log: StructuredLogCapture, +) -> None: + # Given a running experiment whose row holds a previously computed payload + experiment.status = ExperimentStatus.RUNNING + experiment.started_at = datetime(2026, 6, 10, tzinfo=dt_timezone.utc) + experiment.save() + as_of = timezone.now() + ExperimentExposures.objects.create( + experiment=experiment, + as_of=as_of, + payload=asdict(_summary()), + ) + mocker.patch( + "experimentation.tasks.compute_exposures_summary", + side_effect=Exception("warehouse unreachable"), + ) + + # When + compute_experiment_exposures(experiment_id=experiment.id) + + # Then the failure is recorded and the last good payload survives + exposures = ExperimentExposures.objects.get(experiment=experiment) + assert exposures.last_error_at is not None + assert exposures.payload == asdict(_summary()) + assert exposures.as_of == as_of + assert log.has( + "exposures.compute_failed", + level="error", + experiment__id=experiment.id, + environment__id=experiment.environment_id, + organisation__id=experiment.environment.project.organisation_id, + ) + + +def test_compute_experiment_exposures__not_started_experiment__skips( + experiment: Experiment, + mocker: MockerFixture, +) -> None: + # Given a created experiment that has never started + mock_compute = mocker.patch( + "experimentation.tasks.compute_exposures_summary", + ) + + # When + compute_experiment_exposures(experiment_id=experiment.id) + + # Then nothing is queried or stored + mock_compute.assert_not_called() + assert not ExperimentExposures.objects.filter(experiment=experiment).exists() diff --git a/docs/docs/deployment-self-hosting/observability/_events-catalogue.md b/docs/docs/deployment-self-hosting/observability/_events-catalogue.md index c0e1d4d35f5a..001ed5a876f5 100644 --- a/docs/docs/deployment-self-hosting/observability/_events-catalogue.md +++ b/docs/docs/deployment-self-hosting/observability/_events-catalogue.md @@ -79,6 +79,29 @@ Attributes: - `environment_api_key` - `environment_id` +### `experimentation.exposures.compute_failed` + +Logged at `error` from: + - `api/experimentation/tasks.py:47` + +Attributes: + - `environment.id` + - `exc_info` + - `experiment.id` + - `organisation.id` + +### `experimentation.exposures.computed` + +Logged at `info` from: + - `api/experimentation/tasks.py:51` + +Attributes: + - `environment.id` + - `excluded_identities.count` + - `experiment.id` + - `identities.count` + - `organisation.id` + ### `feature_health.feature_health_event_dismissal_not_supported` Logged at `warning` from: From c5b9474133860edf8cd9fb60c0480b1cb09bd6ac Mon Sep 17 00:00:00 2001 From: Gagan Trivedi Date: Thu, 11 Jun 2026 15:46:09 +0530 Subject: [PATCH 2/6] feat(experimentation): throttle exposure refreshes and guard final rows --- api/experimentation/constants.py | 2 + .../0007_exposures_refresh_requested_at.py | 18 +++ api/experimentation/models.py | 5 + api/experimentation/serializers.py | 2 +- api/experimentation/tasks.py | 11 +- api/experimentation/views.py | 23 ++++ .../experimentation/test_experiment_views.py | 127 +++++++++++++++++- api/tests/unit/experimentation/test_tasks.py | 18 +++ .../observability/_events-catalogue.md | 4 +- 9 files changed, 201 insertions(+), 9 deletions(-) create mode 100644 api/experimentation/migrations/0007_exposures_refresh_requested_at.py diff --git a/api/experimentation/constants.py b/api/experimentation/constants.py index 07b0eb643718..9dab5688cd0e 100644 --- a/api/experimentation/constants.py +++ b/api/experimentation/constants.py @@ -8,3 +8,5 @@ variant key.""" EXPOSURE_HOURLY_BUCKET_MAX_WINDOW = timedelta(hours=72) + +EXPOSURES_REFRESH_MIN_INTERVAL = timedelta(minutes=5) diff --git a/api/experimentation/migrations/0007_exposures_refresh_requested_at.py b/api/experimentation/migrations/0007_exposures_refresh_requested_at.py new file mode 100644 index 000000000000..8bfde16def76 --- /dev/null +++ b/api/experimentation/migrations/0007_exposures_refresh_requested_at.py @@ -0,0 +1,18 @@ +# Generated by Django 5.2.14 on 2026-06-11 10:12 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("experimentation", "0006_experiment_exposures"), + ] + + operations = [ + migrations.AddField( + model_name="experimentexposures", + name="refresh_requested_at", + field=models.DateTimeField(blank=True, null=True), + ), + ] diff --git a/api/experimentation/models.py b/api/experimentation/models.py index a0787fe043e9..5c8d6a3f3c92 100644 --- a/api/experimentation/models.py +++ b/api/experimentation/models.py @@ -143,6 +143,7 @@ class ExperimentExposures(models.Model): models.JSONField(null=True, blank=True) ) last_error_at = models.DateTimeField(null=True, blank=True) + refresh_requested_at = models.DateTimeField(null=True, blank=True) def record_refresh(self, summary: "ExposuresSummary", as_of: datetime) -> None: self.payload = asdict(summary) @@ -154,6 +155,10 @@ def record_failure(self) -> None: self.last_error_at = timezone.now() self.save(update_fields=["last_error_at"]) + def record_refresh_request(self) -> None: + self.refresh_requested_at = timezone.now() + self.save(update_fields=["refresh_requested_at"]) + class MetricAggregation(models.TextChoices): COUNT = "count", "Count" diff --git a/api/experimentation/serializers.py b/api/experimentation/serializers.py index c690a57510bd..68c7f867ef1c 100644 --- a/api/experimentation/serializers.py +++ b/api/experimentation/serializers.py @@ -251,4 +251,4 @@ class ExperimentListSerializer(ExperimentSerializer): class ExperimentExposuresSerializer(serializers.ModelSerializer): # type: ignore[type-arg] class Meta: model = ExperimentExposures - fields = ("as_of", "last_error_at", "payload") + fields = ("as_of", "last_error_at", "refresh_requested_at", "payload") diff --git a/api/experimentation/tasks.py b/api/experimentation/tasks.py index c98d6bfb0ce3..f3ae6f585172 100644 --- a/api/experimentation/tasks.py +++ b/api/experimentation/tasks.py @@ -21,11 +21,12 @@ def delete_environment_key_from_ingestion(environment_api_key: str) -> None: @register_task_handler() def compute_experiment_exposures(experiment_id: int) -> None: - experiment = Experiment.objects.select_related( - "environment__project", - "feature", - ).get(id=experiment_id) - if not experiment.started_at: + experiment = ( + Experiment.objects.select_related("environment__project", "feature") + .filter(id=experiment_id) + .first() + ) + if experiment is None or not experiment.started_at: return log = logger.bind( diff --git a/api/experimentation/views.py b/api/experimentation/views.py index 1c5b67b9f8d5..66cbdea0296c 100644 --- a/api/experimentation/views.py +++ b/api/experimentation/views.py @@ -4,6 +4,7 @@ from django.db import IntegrityError from django.db.models import Count, Prefetch, Q, QuerySet from django.shortcuts import get_object_or_404 +from django.utils import timezone from rest_framework import mixins, serializers, status from rest_framework.decorators import action from rest_framework.permissions import IsAuthenticated @@ -14,8 +15,10 @@ from app.pagination import CustomPagination from environments.views import NestedEnvironmentViewSet +from experimentation.constants import EXPOSURES_REFRESH_MIN_INTERVAL from experimentation.models import ( Experiment, + ExperimentExposures, ExperimentMetric, ExperimentStatus, Metric, @@ -290,6 +293,26 @@ def refresh_exposures(self, request: Request, **kwargs: object) -> Response: {"detail": "Cannot refresh exposures before the experiment starts."}, status=status.HTTP_400_BAD_REQUEST, ) + exposures, _ = ExperimentExposures.objects.get_or_create(experiment=experiment) + if ( + experiment.ended_at is not None + and exposures.as_of is not None + and exposures.as_of >= experiment.ended_at + ): + return Response( + {"detail": "Exposures are final for this completed experiment."}, + status=status.HTTP_400_BAD_REQUEST, + ) + if ( + exposures.refresh_requested_at is not None + and timezone.now() - exposures.refresh_requested_at + < EXPOSURES_REFRESH_MIN_INTERVAL + ): + return Response( + {"detail": "A refresh was requested recently. Try again later."}, + status=status.HTTP_429_TOO_MANY_REQUESTS, + ) + exposures.record_refresh_request() compute_experiment_exposures.delay(kwargs={"experiment_id": experiment.id}) return Response(status=status.HTTP_202_ACCEPTED) diff --git a/api/tests/unit/experimentation/test_experiment_views.py b/api/tests/unit/experimentation/test_experiment_views.py index 86a14e4cca37..70477194c88a 100644 --- a/api/tests/unit/experimentation/test_experiment_views.py +++ b/api/tests/unit/experimentation/test_experiment_views.py @@ -7,6 +7,7 @@ import pytest from django.db import IntegrityError from django.urls import reverse +from django.utils import timezone from pytest_mock import MockerFixture from rest_framework import status from rest_framework.test import APIClient @@ -14,7 +15,10 @@ from audit.models import AuditLog from audit.related_object_type import RelatedObjectType from environments.models import Environment -from experimentation.constants import EXPERIMENT_FLAG +from experimentation.constants import ( + EXPERIMENT_FLAG, + EXPOSURES_REFRESH_MIN_INTERVAL, +) from experimentation.models import ( Experiment, ExperimentExposures, @@ -690,6 +694,7 @@ def test_exposures__computed_row__returns_row( "exposures": { "as_of": "2026-06-11T12:00:00Z", "last_error_at": None, + "refresh_requested_at": None, "payload": payload, } } @@ -740,6 +745,7 @@ def test_exposures__failed_refresh__returns_error_marker_with_last_payload( "exposures": { "as_of": "2026-06-11T11:00:00Z", "last_error_at": "2026-06-11T12:00:00Z", + "refresh_requested_at": None, "payload": payload, } } @@ -799,6 +805,125 @@ def test_refresh_exposures__started_experiment__enqueues_compute( mock_compute.delay.assert_called_once_with( kwargs={"experiment_id": experiment.id}, ) + exposures = ExperimentExposures.objects.get(experiment=experiment) + assert exposures.refresh_requested_at is not None + + +def test_refresh_exposures__requested_recently__returns_429( + admin_client_new: APIClient, + environment: Environment, + experiment: Experiment, + enable_features: EnableFeaturesFixture, + mocker: MockerFixture, +) -> None: + # Given a refresh was requested moments ago + enable_features(EXPERIMENT_FLAG) + experiment.status = ExperimentStatus.RUNNING + experiment.started_at = datetime(2026, 6, 10, tzinfo=dt_timezone.utc) + experiment.save() + ExperimentExposures.objects.create( + experiment=experiment, + refresh_requested_at=timezone.now(), + ) + mock_compute = mocker.patch("experimentation.views.compute_experiment_exposures") + + # When + response = admin_client_new.post( + _action_url(environment, experiment, "refresh-exposures") + ) + + # Then + assert response.status_code == status.HTTP_429_TOO_MANY_REQUESTS + mock_compute.delay.assert_not_called() + + +def test_refresh_exposures__last_request_beyond_interval__enqueues_compute( + admin_client_new: APIClient, + environment: Environment, + experiment: Experiment, + enable_features: EnableFeaturesFixture, + mocker: MockerFixture, +) -> None: + # Given the last refresh request is older than the minimum interval + enable_features(EXPERIMENT_FLAG) + experiment.status = ExperimentStatus.RUNNING + experiment.started_at = datetime(2026, 6, 10, tzinfo=dt_timezone.utc) + experiment.save() + ExperimentExposures.objects.create( + experiment=experiment, + refresh_requested_at=timezone.now() - EXPOSURES_REFRESH_MIN_INTERVAL, + ) + mock_compute = mocker.patch("experimentation.views.compute_experiment_exposures") + + # When + response = admin_client_new.post( + _action_url(environment, experiment, "refresh-exposures") + ) + + # Then + assert response.status_code == status.HTTP_202_ACCEPTED + mock_compute.delay.assert_called_once() + + +def test_refresh_exposures__completed_with_final_row__returns_400( + admin_client_new: APIClient, + environment: Environment, + experiment: Experiment, + enable_features: EnableFeaturesFixture, + mocker: MockerFixture, +) -> None: + # Given a completed experiment whose row already covers the full window + enable_features(EXPERIMENT_FLAG) + experiment.status = ExperimentStatus.COMPLETED + experiment.started_at = datetime(2026, 6, 1, tzinfo=dt_timezone.utc) + experiment.ended_at = datetime(2026, 6, 8, tzinfo=dt_timezone.utc) + experiment.save() + ExperimentExposures.objects.create( + experiment=experiment, + as_of=experiment.ended_at, + payload={"excluded_identities": 0}, + ) + mock_compute = mocker.patch("experimentation.views.compute_experiment_exposures") + + # When + response = admin_client_new.post( + _action_url(environment, experiment, "refresh-exposures") + ) + + # Then the final data cannot be recomputed away (events expire in the + # warehouse after 90 days) + assert response.status_code == status.HTTP_400_BAD_REQUEST + mock_compute.delay.assert_not_called() + + +def test_refresh_exposures__completed_with_stale_row__enqueues_compute( + admin_client_new: APIClient, + environment: Environment, + experiment: Experiment, + enable_features: EnableFeaturesFixture, + mocker: MockerFixture, +) -> None: + # Given a completed experiment last computed before it ended + enable_features(EXPERIMENT_FLAG) + experiment.status = ExperimentStatus.COMPLETED + experiment.started_at = datetime(2026, 6, 1, tzinfo=dt_timezone.utc) + experiment.ended_at = datetime(2026, 6, 8, tzinfo=dt_timezone.utc) + experiment.save() + ExperimentExposures.objects.create( + experiment=experiment, + as_of=datetime(2026, 6, 7, tzinfo=dt_timezone.utc), + payload={"excluded_identities": 0}, + ) + mock_compute = mocker.patch("experimentation.views.compute_experiment_exposures") + + # When + response = admin_client_new.post( + _action_url(environment, experiment, "refresh-exposures") + ) + + # Then the finalising refresh is allowed + assert response.status_code == status.HTTP_202_ACCEPTED + mock_compute.delay.assert_called_once() def test_refresh_exposures__not_started_experiment__returns_400( diff --git a/api/tests/unit/experimentation/test_tasks.py b/api/tests/unit/experimentation/test_tasks.py index 46cd3239528d..8d000c666f70 100644 --- a/api/tests/unit/experimentation/test_tasks.py +++ b/api/tests/unit/experimentation/test_tasks.py @@ -191,3 +191,21 @@ def test_compute_experiment_exposures__not_started_experiment__skips( # Then nothing is queried or stored mock_compute.assert_not_called() assert not ExperimentExposures.objects.filter(experiment=experiment).exists() + + +def test_compute_experiment_exposures__experiment_deleted_after_enqueue__skips( + experiment: Experiment, + mocker: MockerFixture, +) -> None: + # Given the experiment is deleted between enqueue and execution + experiment_id = experiment.id + experiment.delete() + mock_compute = mocker.patch( + "experimentation.tasks.compute_exposures_summary", + ) + + # When + compute_experiment_exposures(experiment_id=experiment_id) + + # Then the task exits without raising into the task processor + mock_compute.assert_not_called() diff --git a/docs/docs/deployment-self-hosting/observability/_events-catalogue.md b/docs/docs/deployment-self-hosting/observability/_events-catalogue.md index 001ed5a876f5..103591a845ca 100644 --- a/docs/docs/deployment-self-hosting/observability/_events-catalogue.md +++ b/docs/docs/deployment-self-hosting/observability/_events-catalogue.md @@ -82,7 +82,7 @@ Attributes: ### `experimentation.exposures.compute_failed` Logged at `error` from: - - `api/experimentation/tasks.py:47` + - `api/experimentation/tasks.py:48` Attributes: - `environment.id` @@ -93,7 +93,7 @@ Attributes: ### `experimentation.exposures.computed` Logged at `info` from: - - `api/experimentation/tasks.py:51` + - `api/experimentation/tasks.py:52` Attributes: - `environment.id` From 9d73293253eecbe3777e6521a697c2968a792a03 Mon Sep 17 00:00:00 2001 From: Gagan Trivedi Date: Thu, 11 Jun 2026 16:20:49 +0530 Subject: [PATCH 3/6] refactor(experimentation): centralise exposures finality on the model --- api/experimentation/models.py | 8 +++++ api/experimentation/tasks.py | 3 ++ api/experimentation/views.py | 32 +++++++++++-------- .../experimentation/test_experiment_views.py | 13 +++++--- api/tests/unit/experimentation/test_tasks.py | 27 ++++++++++++++++ .../observability/_events-catalogue.md | 4 +-- 6 files changed, 66 insertions(+), 21 deletions(-) diff --git a/api/experimentation/models.py b/api/experimentation/models.py index 5c8d6a3f3c92..acaf228fa85e 100644 --- a/api/experimentation/models.py +++ b/api/experimentation/models.py @@ -145,6 +145,14 @@ class ExperimentExposures(models.Model): last_error_at = models.DateTimeField(null=True, blank=True) refresh_requested_at = models.DateTimeField(null=True, blank=True) + @property + def is_final(self) -> bool: + # Recomputing a final row can only lose data: warehouse events expire. + ended_at = self.experiment.ended_at + return ( + ended_at is not None and self.as_of is not None and self.as_of >= ended_at + ) + def record_refresh(self, summary: "ExposuresSummary", as_of: datetime) -> None: self.payload = asdict(summary) self.as_of = as_of diff --git a/api/experimentation/tasks.py b/api/experimentation/tasks.py index f3ae6f585172..27db17c6ab08 100644 --- a/api/experimentation/tasks.py +++ b/api/experimentation/tasks.py @@ -35,6 +35,9 @@ def compute_experiment_exposures(experiment_id: int) -> None: organisation__id=experiment.environment.project.organisation_id, ) exposures, _ = ExperimentExposures.objects.get_or_create(experiment=experiment) + if exposures.is_final: + return + as_of = experiment.ended_at or timezone.now() try: summary = compute_exposures_summary( diff --git a/api/experimentation/views.py b/api/experimentation/views.py index 66cbdea0296c..099037a34568 100644 --- a/api/experimentation/views.py +++ b/api/experimentation/views.py @@ -1,4 +1,5 @@ import logging +import math from typing import Any from django.db import IntegrityError @@ -293,24 +294,27 @@ def refresh_exposures(self, request: Request, **kwargs: object) -> Response: {"detail": "Cannot refresh exposures before the experiment starts."}, status=status.HTTP_400_BAD_REQUEST, ) - exposures, _ = ExperimentExposures.objects.get_or_create(experiment=experiment) - if ( - experiment.ended_at is not None - and exposures.as_of is not None - and exposures.as_of >= experiment.ended_at - ): + exposures = ExperimentExposures.objects.filter(experiment=experiment).first() + if exposures is not None and exposures.is_final: return Response( {"detail": "Exposures are final for this completed experiment."}, status=status.HTTP_400_BAD_REQUEST, ) - if ( - exposures.refresh_requested_at is not None - and timezone.now() - exposures.refresh_requested_at - < EXPOSURES_REFRESH_MIN_INTERVAL - ): - return Response( - {"detail": "A refresh was requested recently. Try again later."}, - status=status.HTTP_429_TOO_MANY_REQUESTS, + if exposures is not None and exposures.refresh_requested_at is not None: + retry_after = EXPOSURES_REFRESH_MIN_INTERVAL - ( + timezone.now() - exposures.refresh_requested_at + ) + if retry_after.total_seconds() > 0: + return Response( + {"detail": "A refresh was requested recently. Try again later."}, + status=status.HTTP_429_TOO_MANY_REQUESTS, + headers={ + "Retry-After": str(math.ceil(retry_after.total_seconds())) + }, + ) + if exposures is None: + exposures, _ = ExperimentExposures.objects.get_or_create( + experiment=experiment ) exposures.record_refresh_request() compute_experiment_exposures.delay(kwargs={"experiment_id": experiment.id}) diff --git a/api/tests/unit/experimentation/test_experiment_views.py b/api/tests/unit/experimentation/test_experiment_views.py index 70477194c88a..95a1d3f1eae0 100644 --- a/api/tests/unit/experimentation/test_experiment_views.py +++ b/api/tests/unit/experimentation/test_experiment_views.py @@ -1,6 +1,6 @@ from __future__ import annotations -from datetime import datetime +from datetime import datetime, timedelta from datetime import timezone as dt_timezone from typing import TYPE_CHECKING @@ -8,6 +8,7 @@ from django.db import IntegrityError from django.urls import reverse from django.utils import timezone +from freezegun import freeze_time from pytest_mock import MockerFixture from rest_framework import status from rest_framework.test import APIClient @@ -809,21 +810,22 @@ def test_refresh_exposures__started_experiment__enqueues_compute( assert exposures.refresh_requested_at is not None -def test_refresh_exposures__requested_recently__returns_429( +@freeze_time("2026-06-11T12:00:00Z") +def test_refresh_exposures__requested_recently__returns_429_with_retry_after( admin_client_new: APIClient, environment: Environment, experiment: Experiment, enable_features: EnableFeaturesFixture, mocker: MockerFixture, ) -> None: - # Given a refresh was requested moments ago + # Given a refresh was requested a minute ago enable_features(EXPERIMENT_FLAG) experiment.status = ExperimentStatus.RUNNING experiment.started_at = datetime(2026, 6, 10, tzinfo=dt_timezone.utc) experiment.save() ExperimentExposures.objects.create( experiment=experiment, - refresh_requested_at=timezone.now(), + refresh_requested_at=timezone.now() - timedelta(minutes=1), ) mock_compute = mocker.patch("experimentation.views.compute_experiment_exposures") @@ -832,8 +834,9 @@ def test_refresh_exposures__requested_recently__returns_429( _action_url(environment, experiment, "refresh-exposures") ) - # Then + # Then the client is told when to retry assert response.status_code == status.HTTP_429_TOO_MANY_REQUESTS + assert response.headers["Retry-After"] == "240" mock_compute.delay.assert_not_called() diff --git a/api/tests/unit/experimentation/test_tasks.py b/api/tests/unit/experimentation/test_tasks.py index 8d000c666f70..58159dbce2fa 100644 --- a/api/tests/unit/experimentation/test_tasks.py +++ b/api/tests/unit/experimentation/test_tasks.py @@ -193,6 +193,33 @@ def test_compute_experiment_exposures__not_started_experiment__skips( assert not ExperimentExposures.objects.filter(experiment=experiment).exists() +def test_compute_experiment_exposures__final_row__skips_without_recompute( + experiment: Experiment, + mocker: MockerFixture, +) -> None: + # Given a completed experiment whose row already covers the full window + experiment.status = ExperimentStatus.COMPLETED + experiment.started_at = datetime(2026, 6, 1, tzinfo=dt_timezone.utc) + experiment.ended_at = datetime(2026, 6, 8, tzinfo=dt_timezone.utc) + experiment.save() + ExperimentExposures.objects.create( + experiment=experiment, + as_of=experiment.ended_at, + payload=asdict(_summary()), + ) + mock_compute = mocker.patch( + "experimentation.tasks.compute_exposures_summary", + ) + + # When + compute_experiment_exposures(experiment_id=experiment.id) + + # Then the final payload is left untouched regardless of the caller + mock_compute.assert_not_called() + exposures = ExperimentExposures.objects.get(experiment=experiment) + assert exposures.payload == asdict(_summary()) + + def test_compute_experiment_exposures__experiment_deleted_after_enqueue__skips( experiment: Experiment, mocker: MockerFixture, diff --git a/docs/docs/deployment-self-hosting/observability/_events-catalogue.md b/docs/docs/deployment-self-hosting/observability/_events-catalogue.md index 103591a845ca..14d7d21e8a5c 100644 --- a/docs/docs/deployment-self-hosting/observability/_events-catalogue.md +++ b/docs/docs/deployment-self-hosting/observability/_events-catalogue.md @@ -82,7 +82,7 @@ Attributes: ### `experimentation.exposures.compute_failed` Logged at `error` from: - - `api/experimentation/tasks.py:48` + - `api/experimentation/tasks.py:51` Attributes: - `environment.id` @@ -93,7 +93,7 @@ Attributes: ### `experimentation.exposures.computed` Logged at `info` from: - - `api/experimentation/tasks.py:52` + - `api/experimentation/tasks.py:55` Attributes: - `environment.id` From c48b818e4a2fb455cf46b9a58e4eb825c5daa71f Mon Sep 17 00:00:00 2001 From: Gagan Trivedi Date: Thu, 11 Jun 2026 16:44:31 +0530 Subject: [PATCH 4/6] refactor(experimentation): drop task logging for now --- api/experimentation/tasks.py | 20 ++-------------- api/tests/unit/experimentation/test_tasks.py | 21 ----------------- .../observability/_events-catalogue.md | 23 ------------------- 3 files changed, 2 insertions(+), 62 deletions(-) diff --git a/api/experimentation/tasks.py b/api/experimentation/tasks.py index 27db17c6ab08..9cde410bed10 100644 --- a/api/experimentation/tasks.py +++ b/api/experimentation/tasks.py @@ -1,4 +1,3 @@ -import structlog from django.utils import timezone from task_processor.decorators import register_task_handler @@ -6,8 +5,6 @@ from experimentation.models import Experiment, ExperimentExposures from experimentation.services import compute_exposures_summary -logger = structlog.get_logger("experimentation") - @register_task_handler() def add_environment_key_to_ingestion(environment_api_key: str) -> None: @@ -22,18 +19,13 @@ def delete_environment_key_from_ingestion(environment_api_key: str) -> None: @register_task_handler() def compute_experiment_exposures(experiment_id: int) -> None: experiment = ( - Experiment.objects.select_related("environment__project", "feature") + Experiment.objects.select_related("environment", "feature") .filter(id=experiment_id) .first() ) if experiment is None or not experiment.started_at: return - log = logger.bind( - experiment__id=experiment.id, - environment__id=experiment.environment_id, - organisation__id=experiment.environment.project.organisation_id, - ) exposures, _ = ExperimentExposures.objects.get_or_create(experiment=experiment) if exposures.is_final: return @@ -46,16 +38,8 @@ def compute_experiment_exposures(experiment_id: int) -> None: window_start=experiment.started_at, window_end=as_of, ) - except Exception as exc: + except Exception: exposures.record_failure() - log.error("exposures.compute_failed", exc_info=exc) return exposures.record_refresh(summary, as_of) - log.info( - "exposures.computed", - identities__count=sum( - sum(point.new_identities.values()) for point in summary.timeseries.points - ), - excluded_identities__count=summary.excluded_identities, - ) diff --git a/api/tests/unit/experimentation/test_tasks.py b/api/tests/unit/experimentation/test_tasks.py index 58159dbce2fa..8ca9bd4ccf62 100644 --- a/api/tests/unit/experimentation/test_tasks.py +++ b/api/tests/unit/experimentation/test_tasks.py @@ -5,7 +5,6 @@ from django.utils import timezone from freezegun import freeze_time from pytest_mock import MockerFixture -from pytest_structlog import StructuredLogCapture from experimentation.dataclasses import ( ExposuresSummary, @@ -73,7 +72,6 @@ def _summary() -> ExposuresSummary: def test_compute_experiment_exposures__running_experiment__stores_summary( experiment: Experiment, mocker: MockerFixture, - log: StructuredLogCapture, ) -> None: # Given a running experiment and a warehouse responding with a summary experiment.status = ExperimentStatus.RUNNING @@ -98,17 +96,6 @@ def test_compute_experiment_exposures__running_experiment__stores_summary( assert exposures.payload == asdict(_summary()) assert exposures.as_of == timezone.now() assert exposures.last_error_at is None - assert log.events == [ - { - "level": "info", - "event": "exposures.computed", - "experiment__id": experiment.id, - "environment__id": experiment.environment_id, - "organisation__id": experiment.environment.project.organisation_id, - "identities__count": 10, - "excluded_identities__count": 1, - } - ] def test_compute_experiment_exposures__completed_experiment__window_ends_at_ended_at( @@ -142,7 +129,6 @@ def test_compute_experiment_exposures__completed_experiment__window_ends_at_ende def test_compute_experiment_exposures__warehouse_error__records_failure( experiment: Experiment, mocker: MockerFixture, - log: StructuredLogCapture, ) -> None: # Given a running experiment whose row holds a previously computed payload experiment.status = ExperimentStatus.RUNNING @@ -167,13 +153,6 @@ def test_compute_experiment_exposures__warehouse_error__records_failure( assert exposures.last_error_at is not None assert exposures.payload == asdict(_summary()) assert exposures.as_of == as_of - assert log.has( - "exposures.compute_failed", - level="error", - experiment__id=experiment.id, - environment__id=experiment.environment_id, - organisation__id=experiment.environment.project.organisation_id, - ) def test_compute_experiment_exposures__not_started_experiment__skips( diff --git a/docs/docs/deployment-self-hosting/observability/_events-catalogue.md b/docs/docs/deployment-self-hosting/observability/_events-catalogue.md index 14d7d21e8a5c..c0e1d4d35f5a 100644 --- a/docs/docs/deployment-self-hosting/observability/_events-catalogue.md +++ b/docs/docs/deployment-self-hosting/observability/_events-catalogue.md @@ -79,29 +79,6 @@ Attributes: - `environment_api_key` - `environment_id` -### `experimentation.exposures.compute_failed` - -Logged at `error` from: - - `api/experimentation/tasks.py:51` - -Attributes: - - `environment.id` - - `exc_info` - - `experiment.id` - - `organisation.id` - -### `experimentation.exposures.computed` - -Logged at `info` from: - - `api/experimentation/tasks.py:55` - -Attributes: - - `environment.id` - - `excluded_identities.count` - - `experiment.id` - - `identities.count` - - `organisation.id` - ### `feature_health.feature_health_event_dismissal_not_supported` Logged at `warning` from: From 85e3a9fe031789ef39a5e366f100ddc07ea5f8df Mon Sep 17 00:00:00 2001 From: Gagan Trivedi Date: Mon, 15 Jun 2026 12:17:06 +0530 Subject: [PATCH 5/6] feat(experimentation): log exposure compute failures --- api/experimentation/tasks.py | 14 ++++++++++++-- api/tests/unit/experimentation/test_tasks.py | 10 ++++++++++ .../observability/_events-catalogue.md | 11 +++++++++++ 3 files changed, 33 insertions(+), 2 deletions(-) diff --git a/api/experimentation/tasks.py b/api/experimentation/tasks.py index 9cde410bed10..302ef3053aff 100644 --- a/api/experimentation/tasks.py +++ b/api/experimentation/tasks.py @@ -1,3 +1,4 @@ +import structlog from django.utils import timezone from task_processor.decorators import register_task_handler @@ -5,6 +6,8 @@ from experimentation.models import Experiment, ExperimentExposures from experimentation.services import compute_exposures_summary +logger = structlog.get_logger("experimentation") + @register_task_handler() def add_environment_key_to_ingestion(environment_api_key: str) -> None: @@ -19,7 +22,7 @@ def delete_environment_key_from_ingestion(environment_api_key: str) -> None: @register_task_handler() def compute_experiment_exposures(experiment_id: int) -> None: experiment = ( - Experiment.objects.select_related("environment", "feature") + Experiment.objects.select_related("environment__project", "feature") .filter(id=experiment_id) .first() ) @@ -38,8 +41,15 @@ def compute_experiment_exposures(experiment_id: int) -> None: window_start=experiment.started_at, window_end=as_of, ) - except Exception: + except Exception as exc: exposures.record_failure() + logger.error( + "exposures.compute_failed", + exc_info=exc, + experiment__id=experiment.id, + environment__id=experiment.environment_id, + organisation__id=experiment.environment.project.organisation_id, + ) return exposures.record_refresh(summary, as_of) diff --git a/api/tests/unit/experimentation/test_tasks.py b/api/tests/unit/experimentation/test_tasks.py index 8ca9bd4ccf62..a465c24bb4b1 100644 --- a/api/tests/unit/experimentation/test_tasks.py +++ b/api/tests/unit/experimentation/test_tasks.py @@ -5,6 +5,7 @@ from django.utils import timezone from freezegun import freeze_time from pytest_mock import MockerFixture +from pytest_structlog import StructuredLogCapture from experimentation.dataclasses import ( ExposuresSummary, @@ -129,6 +130,7 @@ def test_compute_experiment_exposures__completed_experiment__window_ends_at_ende def test_compute_experiment_exposures__warehouse_error__records_failure( experiment: Experiment, mocker: MockerFixture, + log: StructuredLogCapture, ) -> None: # Given a running experiment whose row holds a previously computed payload experiment.status = ExperimentStatus.RUNNING @@ -153,6 +155,14 @@ def test_compute_experiment_exposures__warehouse_error__records_failure( assert exposures.last_error_at is not None assert exposures.payload == asdict(_summary()) assert exposures.as_of == as_of + # And the failure is logged for operators + assert log.has( + "exposures.compute_failed", + level="error", + experiment__id=experiment.id, + environment__id=experiment.environment_id, + organisation__id=experiment.environment.project.organisation_id, + ) def test_compute_experiment_exposures__not_started_experiment__skips( diff --git a/docs/docs/deployment-self-hosting/observability/_events-catalogue.md b/docs/docs/deployment-self-hosting/observability/_events-catalogue.md index c0e1d4d35f5a..24a48df0d14f 100644 --- a/docs/docs/deployment-self-hosting/observability/_events-catalogue.md +++ b/docs/docs/deployment-self-hosting/observability/_events-catalogue.md @@ -79,6 +79,17 @@ Attributes: - `environment_api_key` - `environment_id` +### `experimentation.exposures.compute_failed` + +Logged at `error` from: + - `api/experimentation/tasks.py:46` + +Attributes: + - `environment.id` + - `exc_info` + - `experiment.id` + - `organisation.id` + ### `feature_health.feature_health_event_dismissal_not_supported` Logged at `warning` from: From 242818d8aef88ab6d57ab89d9c551a6c3a879df9 Mon Sep 17 00:00:00 2001 From: Gagan Trivedi Date: Mon, 15 Jun 2026 12:19:47 +0530 Subject: [PATCH 6/6] refactor(experimentation): drop is_final comment --- api/experimentation/models.py | 1 - 1 file changed, 1 deletion(-) diff --git a/api/experimentation/models.py b/api/experimentation/models.py index acaf228fa85e..5fe779007fe8 100644 --- a/api/experimentation/models.py +++ b/api/experimentation/models.py @@ -147,7 +147,6 @@ class ExperimentExposures(models.Model): @property def is_final(self) -> bool: - # Recomputing a final row can only lose data: warehouse events expire. ended_at = self.experiment.ended_at return ( ended_at is not None and self.as_of is not None and self.as_of >= ended_at