diff --git a/api/integrations/launch_darkly/services.py b/api/integrations/launch_darkly/services.py index 6710ff4d84af..a8db41617da4 100644 --- a/api/integrations/launch_darkly/services.py +++ b/api/integrations/launch_darkly/services.py @@ -39,6 +39,7 @@ from integrations.launch_darkly.types import Clause from projects.models import Project from projects.tags.models import Tag +from segment_membership.services import enqueue_membership_refresh from segments.models import Condition, Segment, SegmentRule from users.models import FFAdminUser from util.db import closing_stale_connections @@ -1179,3 +1180,6 @@ def process_import_request( import_request.status["deprecated_flag_count"] = sum( 1 for ld_flag in ld_flags if ld_flag["deprecated"] ) + + # Refresh membership counts for the segments the import just created. + enqueue_membership_refresh(import_request.project) diff --git a/api/segment_membership/services.py b/api/segment_membership/services.py index 9be17c1baee9..57f45c471116 100644 --- a/api/segment_membership/services.py +++ b/api/segment_membership/services.py @@ -7,6 +7,7 @@ from flag_engine.context.types import EvaluationContext from flagsmith_sql_flag_engine import TranslateContext, translate_segment from flagsmith_sql_flag_engine.dialects import ClickHouseDialect +from task_processor.models import Task from integrations.flagsmith.client import get_openfeature_client from organisations.models import Organisation @@ -28,6 +29,29 @@ def is_membership_enabled(organisation: Organisation) -> bool: ) +def enqueue_membership_refresh(project: Project) -> None: + """Queue a per-project segment membership count refresh after a canonical + segment is created or edited. + + No-op when the org has the feature off, or when a refresh for the project + is already pending or running. + """ + if not is_membership_enabled(project.organisation): + return + + from segment_membership.tasks import refresh_project_segment_counts + + if Task.objects.filter( + task_identifier=refresh_project_segment_counts.task_identifier, + completed=False, + num_failures__lt=3, + serialized_args=Task.serialize_data((project.id,)), + ).exists(): + return + + refresh_project_segment_counts.delay(args=(project.id,)) + + @contextmanager def open_clickhouse_cursor( *, log_comment: str | None = None diff --git a/api/segments/serializers.py b/api/segments/serializers.py index fbd4d46fd611..a54142c3e0ef 100644 --- a/api/segments/serializers.py +++ b/api/segments/serializers.py @@ -11,6 +11,7 @@ from metadata.serializers import MetadataSerializer, MetadataSerializerMixin from projects.models import Project from segment_membership.models import SegmentMembershipCount +from segment_membership.services import enqueue_membership_refresh from segments.models import Condition, Segment, SegmentRule logger = structlog.get_logger(__name__) @@ -145,6 +146,7 @@ def create(self, validated_data: dict[str, Any]): # type: ignore[no-untyped-def metadata_data = validated_data.pop("metadata", []) segment = super().create(validated_data) # type: ignore[no-untyped-call] self._update_metadata(segment, metadata_data) + enqueue_membership_refresh(segment.project) return segment def update(self, segment: Segment, validated_data: dict[str, Any]): # type: ignore[no-untyped-def] @@ -159,6 +161,7 @@ def update(self, segment: Segment, validated_data: dict[str, Any]): # type: ign ) segment = super().update(segment, validated_data) # type: ignore[no-untyped-call] self._update_metadata(segment, metadata) + enqueue_membership_refresh(segment.project) return segment def _get_rules_and_conditions_without_deleted( diff --git a/api/segments/views.py b/api/segments/views.py index e0b29b1d05ab..f8d459433154 100644 --- a/api/segments/views.py +++ b/api/segments/views.py @@ -22,6 +22,7 @@ ) from features.versioning.models import EnvironmentFeatureVersion from projects.models import Project +from segment_membership.services import enqueue_membership_refresh from .models import Segment from .permissions import SegmentPermissions @@ -180,6 +181,7 @@ def clone(self, request: Request, *args: Any, **kwargs: Any) -> Response: serializer = CloneSegmentSerializer(data=request.data) serializer.is_valid(raise_exception=True) clone = source_segment.clone(name=serializer.validated_data["name"]) + enqueue_membership_refresh(clone.project) return Response(SegmentSerializer(clone).data, status=status.HTTP_201_CREATED) diff --git a/api/tests/integration/segments/test_segment_membership_refresh.py b/api/tests/integration/segments/test_segment_membership_refresh.py new file mode 100644 index 000000000000..535c37f836ee --- /dev/null +++ b/api/tests/integration/segments/test_segment_membership_refresh.py @@ -0,0 +1,96 @@ +import json + +from django.urls import reverse +from pytest_mock import MockerFixture +from rest_framework import status +from rest_framework.test import APIClient + +from projects.models import Project + + +def test_update_segment__edit__enqueues_membership_refresh( + admin_client: APIClient, + project: int, + segment: int, + mocker: MockerFixture, +) -> None: + # Given + enqueue_membership_refresh_mock = mocker.patch( + "segments.serializers.enqueue_membership_refresh" + ) + url = reverse( + "api-v1:projects:project-segments-detail", + args=[project, segment], + ) + data = { + "name": "renamed", + "project": project, + "rules": [{"type": "ALL", "rules": [], "conditions": []}], + } + + # When + response = admin_client.put( + url, data=json.dumps(data), content_type="application/json" + ) + + # Then + assert response.status_code == status.HTTP_200_OK + enqueue_membership_refresh_mock.assert_called_once_with( + Project.objects.get(pk=project) + ) + + +def test_create_segment__new_segment__enqueues_membership_refresh( + admin_client: APIClient, + project: int, + mocker: MockerFixture, +) -> None: + # Given + enqueue_membership_refresh_mock = mocker.patch( + "segments.serializers.enqueue_membership_refresh" + ) + url = reverse("api-v1:projects:project-segments-list", args=[project]) + data = { + "name": "new-segment", + "project": project, + "rules": [{"type": "ALL", "rules": [], "conditions": []}], + } + + # When + response = admin_client.post( + url, data=json.dumps(data), content_type="application/json" + ) + + # Then + assert response.status_code == status.HTTP_201_CREATED + enqueue_membership_refresh_mock.assert_called_once_with( + Project.objects.get(pk=project) + ) + + +def test_clone_segment__clone__enqueues_membership_refresh( + admin_client: APIClient, + project: int, + segment: int, + mocker: MockerFixture, +) -> None: + # Given + enqueue_membership_refresh_mock = mocker.patch( + "segments.views.enqueue_membership_refresh" + ) + url = reverse( + "api-v1:projects:project-segments-clone", + args=[project, segment], + ) + data = {"name": "cloned-segment"} + + # When + response = admin_client.post( + url, data=json.dumps(data), content_type="application/json" + ) + + # Then + assert response.status_code == status.HTTP_201_CREATED + enqueue_membership_refresh_mock.assert_called_once_with( + Project.objects.get(pk=project) + ) diff --git a/api/tests/unit/integrations/launch_darkly/test_services.py b/api/tests/unit/integrations/launch_darkly/test_services.py index dfd223250b0d..56f5b5b06c03 100644 --- a/api/tests/unit/integrations/launch_darkly/test_services.py +++ b/api/tests/unit/integrations/launch_darkly/test_services.py @@ -9,6 +9,7 @@ from django.conf import settings from django.core import signing from flag_engine.segments import constants as segment_constants +from pytest_mock import MockerFixture from requests.exceptions import HTTPError, RequestException, Timeout from environments.identities.models import Identity @@ -646,3 +647,23 @@ def test_serialize_variation_value__various_types__returns_expected( # Then assert result == expected + + +@pytest.mark.django_db(transaction=True) +def test_process_import_request__import__enqueues_membership_refresh( + import_request: LaunchDarklyImportRequest, + project: Project, + mocker: MockerFixture, +) -> None: + # Given + enqueue_membership_refresh_mock = mocker.patch( + "integrations.launch_darkly.services.enqueue_membership_refresh" + ) + + # When + # the import (which bulk-creates segments) completes + process_import_request(import_request) + + # Then + # it triggers a single membership refresh for the imported project + enqueue_membership_refresh_mock.assert_called_once_with(project) diff --git a/api/tests/unit/segment_membership/test_unit_segment_membership_services.py b/api/tests/unit/segment_membership/test_unit_segment_membership_services.py index fe33d3d74554..9138b8f18914 100644 --- a/api/tests/unit/segment_membership/test_unit_segment_membership_services.py +++ b/api/tests/unit/segment_membership/test_unit_segment_membership_services.py @@ -1,5 +1,7 @@ from unittest.mock import MagicMock +from common.test_tools import RunTasksFixture +from pytest_django.fixtures import SettingsWrapper from pytest_mock import MockerFixture from environments.models import Environment @@ -7,10 +9,12 @@ from projects.models import Project from segment_membership.services import ( compute_segment_counts_for_project, + enqueue_membership_refresh, get_projects_to_process, is_membership_enabled, open_clickhouse_cursor, ) +from segment_membership.tasks import refresh_project_segment_counts from segments.models import Segment, SegmentRule from tests.types import EnableFeaturesFixture @@ -220,3 +224,113 @@ def test_compute_segment_counts_for_project__untranslatable_segment__skips( # Then assert result == [] cursor.execute.assert_not_called() + + +def test_enqueue_membership_refresh__flag_on__enqueues_refresh( + run_tasks: RunTasksFixture, + mocker: MockerFixture, + settings: SettingsWrapper, + project: Project, + segment: Segment, + enable_features: EnableFeaturesFixture, +) -> None: + # Given + # an org with the flag on + enable_features("segment_membership_inspection") + settings.CLICKHOUSE_ENABLED = True + mocker.patch("segment_membership.tasks.open_clickhouse_cursor") + compute_segment_counts_for_project_mock = mocker.patch( + "segment_membership.tasks.compute_segment_counts_for_project", + return_value=[], + ) + + # When + enqueue_membership_refresh(project) + run_tasks(num_tasks=2) + + # Then + # exactly one refresh runs, for the project + compute_segment_counts_for_project_mock.assert_called_once_with(project, mocker.ANY) + + +def test_enqueue_membership_refresh__flag_off__does_not_enqueue( + run_tasks: RunTasksFixture, + mocker: MockerFixture, + settings: SettingsWrapper, + project: Project, +) -> None: + # Given + # the org's flag is off + settings.CLICKHOUSE_ENABLED = True + mocker.patch("segment_membership.tasks.open_clickhouse_cursor") + compute_segment_counts_for_project_mock = mocker.patch( + "segment_membership.tasks.compute_segment_counts_for_project", + return_value=[], + ) + + # When + enqueue_membership_refresh(project) + run_tasks(num_tasks=1) + + # Then + # no refresh runs + compute_segment_counts_for_project_mock.assert_not_called() + + +def test_enqueue_membership_refresh__refresh_already_pending__debounces( + run_tasks: RunTasksFixture, + mocker: MockerFixture, + settings: SettingsWrapper, + project: Project, + enable_features: EnableFeaturesFixture, +) -> None: + # Given + # a refresh for the same project is already pending + enable_features("segment_membership_inspection") + settings.CLICKHOUSE_ENABLED = True + mocker.patch("segment_membership.tasks.open_clickhouse_cursor") + compute_segment_counts_for_project_mock = mocker.patch( + "segment_membership.tasks.compute_segment_counts_for_project", + return_value=[], + ) + refresh_project_segment_counts.delay(args=(project.id,)) + + # When + enqueue_membership_refresh(project) + run_tasks(num_tasks=2) + + # Then + # the call did not enqueue a second refresh -- only the pending one ran + compute_segment_counts_for_project_mock.assert_called_once_with(project, mocker.ANY) + + +def test_enqueue_membership_refresh__pending_for_other_project__still_enqueues( + run_tasks: RunTasksFixture, + mocker: MockerFixture, + settings: SettingsWrapper, + project: Project, + project_b: Project, + enable_features: EnableFeaturesFixture, +) -> None: + # Given + # a refresh is pending for a different project only + enable_features("segment_membership_inspection") + settings.CLICKHOUSE_ENABLED = True + mocker.patch("segment_membership.tasks.open_clickhouse_cursor") + compute_segment_counts_for_project_mock = mocker.patch( + "segment_membership.tasks.compute_segment_counts_for_project", + return_value=[], + ) + refresh_project_segment_counts.delay(args=(project_b.id,)) + + # When + enqueue_membership_refresh(project) + run_tasks(num_tasks=3) + + # Then + # the debounce is scoped per project: both refreshes run + assert compute_segment_counts_for_project_mock.call_count == 2 + compute_segment_counts_for_project_mock.assert_has_calls( + [mocker.call(project, mocker.ANY), mocker.call(project_b, mocker.ANY)], + any_order=True, + ) diff --git a/docs/docs/deployment-self-hosting/observability/_events-catalogue.md b/docs/docs/deployment-self-hosting/observability/_events-catalogue.md index e3a2fdfc6787..b21521e825fc 100644 --- a/docs/docs/deployment-self-hosting/observability/_events-catalogue.md +++ b/docs/docs/deployment-self-hosting/observability/_events-catalogue.md @@ -376,7 +376,7 @@ Attributes: ### `segment_membership.compute.segment.skipped` Logged at `error` from: - - `api/segment_membership/services.py:96` + - `api/segment_membership/services.py:120` Attributes: - `project.id` @@ -414,7 +414,7 @@ Attributes: ### `segments.serializers.segment_revision_created` Logged at `info` from: - - `api/segments/serializers.py:155` + - `api/segments/serializers.py:157` Attributes: - `revision_id`