Skip to content
17 changes: 17 additions & 0 deletions api/clickhouse/migrations/0002_identities_add_is_deleted.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from django.db import migrations

_ADD_COLUMN_DDL = (
"ALTER TABLE IDENTITIES ADD COLUMN IF NOT EXISTS is_deleted Bool DEFAULT false"
)


class Migration(migrations.Migration):
# ClickHouse has no transactional DDL.
atomic = False
dependencies = [("clickhouse", "0001_create_identities")]
operations = [
migrations.RunSQL(
_ADD_COLUMN_DDL,
reverse_sql=("ALTER TABLE IDENTITIES DROP COLUMN IF EXISTS is_deleted"),
)
]
13 changes: 13 additions & 0 deletions api/edge_api/identities/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import typing
from contextlib import suppress

from django.conf import settings
from django.db.models import Prefetch, Q

from api_keys.user import APIKeyUser
Expand Down Expand Up @@ -194,6 +195,18 @@ def delete(self, user: FFAdminUser | APIKeyUser = None) -> None: # type: ignore
user=user,
)
self._reset_initial_state() # type: ignore[no-untyped-call]
if settings.CLICKHOUSE_ENABLED:
from segment_membership.tasks import (
write_identity_deletion_tombstone_to_clickhouse,
)

write_identity_deletion_tombstone_to_clickhouse.delay(
args=(
self.engine_identity_model.environment_api_key,
self.engine_identity_model.identifier,
self.engine_identity_model.composite_key,
)
)

def synchronise_features(self, valid_feature_names: typing.Collection[str]) -> None:
identity_feature_names = {
Expand Down
9 changes: 6 additions & 3 deletions api/segment_membership/mappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@

from flagsmith_schemas import dynamodb

# (environment_id, identifier, identity_key, traits)
ClickHouseIdentityRow = tuple[str, str, str, dict[str, object] | None]
# (environment_id, identifier, identity_key, traits, is_deleted)
ClickHouseIdentityRow = tuple[str, str, str, dict[str, object] | None, bool]


def map_identity_document_to_clickhouse_row(
env_key: str,
identity_doc: dynamodb.Identity,
*,
is_deleted: bool = False,
) -> ClickHouseIdentityRow:
"""Project a Dynamo identity document onto an IDENTITIES row tuple
`(environment_id, identifier, identity_key, traits)`."""
`(environment_id, identifier, identity_key, traits, is_deleted)`."""
identifier = identity_doc["identifier"]
composite_key = identity_doc["composite_key"]
raw_traits = identity_doc.get("identity_traits")
Expand All @@ -21,6 +23,7 @@ def map_identity_document_to_clickhouse_row(
identifier,
composite_key,
traits,
is_deleted,
)


Expand Down
4 changes: 3 additions & 1 deletion api/segment_membership/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ def compute_segment_counts_for_project(
f"SELECT {seg.id} AS segment_id, "
f"i.environment_id AS env_key, count() AS c "
f"FROM IDENTITIES AS i FINAL "
f"WHERE i.environment_id IN %(env_keys)s AND ({predicate}) "
f"WHERE i.environment_id IN %(env_keys)s "
f"AND i.is_deleted = false "
f"AND ({predicate}) "
f"GROUP BY i.environment_id"
)

Expand Down
61 changes: 61 additions & 0 deletions api/segment_membership/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
)

from environments.dynamodb.wrappers.identity_wrapper import DynamoIdentityWrapper
from environments.models import Environment
from projects.models import Project
from segment_membership.mappers import map_identity_document_to_clickhouse_row
from segment_membership.metrics import (
Expand Down Expand Up @@ -45,6 +46,7 @@
"identifier",
"identity_key",
"traits",
"is_deleted",
)

_INSERT_IDENTITIES_SQL = (
Expand Down Expand Up @@ -188,3 +190,62 @@ def refresh_project_segment_counts(project_id: int) -> None:
membership_counts__count=len(membership_counts),
stale_counts__count=stale_deleted,
)


@register_task_handler()
def write_identity_deletion_tombstone_to_clickhouse(
env_key: str,
identifier: str,
identity_key: str,
) -> None:
"""Insert a tombstone row for a deleted identity so it is excluded from
segment membership counts at the next refresh.

ReplacingMergeTree(inserted_at) keeps the row with the highest
inserted_at per (environment_id, identifier). Because this row is
written after the identity is removed from Dynamo its inserted_at
will be newer than any prior live row, so FINAL deduplication will
always surface the tombstone.
"""
if not settings.CLICKHOUSE_ENABLED:
logger.info(
"tombstone.skipped",
reason="clickhouse_not_configured",
env_key=env_key,
identifier=identifier,
)
return
Comment on lines +210 to +217

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In a multi-tenant environment, writing tombstone rows for every deleted identity across all organizations—even those that do not have segment_membership_inspection enabled—will lead to unnecessary ClickHouse write amplification and table bloat. We should check if the organization has segment membership enabled before writing the tombstone.

    if not settings.CLICKHOUSE_ENABLED:
        logger.info(
            "tombstone.skipped",
            reason="clickhouse_not_configured",
            env_key=env_key,
            identifier=identifier,
        )
        return

    from environments.models import Environment

    try:
        environment = Environment.objects.select_related(
            "project__organisation"
        ).get(api_key=env_key)
    except Environment.DoesNotExist:
        logger.info(
            "tombstone.skipped",
            reason="environment_not_found",
            env_key=env_key,
            identifier=identifier,
        )
        return

    if not is_membership_enabled(environment.project.organisation):
        logger.info(
            "tombstone.skipped",
            reason="segment_membership_disabled",
            env_key=env_key,
            identifier=identifier,
        )
        return

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have addressed this raised concern.


try:
environment = Environment.objects.select_related("project__organisation").get(
api_key=env_key
)
except Environment.DoesNotExist:
logger.info(
"tombstone.skipped",
reason="environment_not_found",
env_key=env_key,
identifier=identifier,
)
return

if not is_membership_enabled(environment.project.organisation):
logger.info(
"tombstone.skipped",
reason="segment_membership_disabled",
env_key=env_key,
identifier=identifier,
)
return

log_comment = f"flagsmith:segment_membership:tombstone:env_{env_key}"
with open_clickhouse_cursor(log_comment=log_comment) as cursor:
cursor.executemany(
_INSERT_IDENTITIES_SQL,
[(env_key, identifier, identity_key, None, True)],
)
logger.info(
"tombstone.written",
env_key=env_key,
identifier=identifier,
)
51 changes: 51 additions & 0 deletions api/tests/unit/edge_api/identities/test_edge_identity_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from django.utils import timezone
from freezegun import freeze_time
from pytest_django import DjangoAssertNumQueries
from pytest_django.fixtures import SettingsWrapper
from pytest_lazyfixture import lazy_fixture # type: ignore[import-untyped]
from pytest_mock import MockerFixture

Expand Down Expand Up @@ -509,6 +510,56 @@ def test_save__feature_override_updated__generates_audit_records(
)


def test_edge_identity_delete__clickhouse_enabled__dispatches_tombstone_task(
mocker: MockerFixture,
edge_identity_model: EdgeIdentity,
edge_identity_dynamo_wrapper_mock: MagicMock,
settings: SettingsWrapper,
) -> None:
# Given
settings.CLICKHOUSE_ENABLED = True
mock_tombstone_task = mocker.MagicMock()
mocker.patch(
"segment_membership.tasks.write_identity_deletion_tombstone_to_clickhouse",
mock_tombstone_task,
)

# When
edge_identity_model.delete()

# Then
edge_identity_dynamo_wrapper_mock.delete_item.assert_called_once()
mock_tombstone_task.delay.assert_called_once_with(
args=(
edge_identity_model.environment_api_key,
edge_identity_model.identifier,
edge_identity_model.engine_identity_model.composite_key,
)
)


def test_edge_identity_delete__clickhouse_disabled__no_tombstone_dispatched(
mocker: MockerFixture,
edge_identity_model: EdgeIdentity,
edge_identity_dynamo_wrapper_mock: MagicMock,
settings: SettingsWrapper,
) -> None:
# Given
settings.CLICKHOUSE_ENABLED = False
mock_tombstone_task = mocker.MagicMock()
mocker.patch(
"segment_membership.tasks.write_identity_deletion_tombstone_to_clickhouse",
mock_tombstone_task,
)

# When
edge_identity_model.delete()

# Then
edge_identity_dynamo_wrapper_mock.delete_item.assert_called_once()
mock_tombstone_task.delay.assert_not_called()


def test_get_all_feature_states__post_v2_versioning_migration__returns_latest_overrides(
environment: Environment,
feature: Feature,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
{"trait_key": "plan", "trait_value": "growth"},
],
},
("env-key", "alice", "env_x_alice", {"plan": "growth"}),
("env-key", "alice", "env_x_alice", {"plan": "growth"}, False),
id="single string trait",
),
pytest.param(
Expand All @@ -34,7 +34,7 @@
"created_date": "2026-05-08T00:00:00Z",
"identity_traits": [],
},
("env-key", "alice", "env_x_alice", None),
("env-key", "alice", "env_x_alice", None, False),
id="empty traits collapse to NULL",
),
pytest.param(
Expand All @@ -48,7 +48,7 @@
{"trait_key": "age", "trait_value": Decimal("18")},
],
},
("env-key", "alice", "env_x_alice", {"age": 18}),
("env-key", "alice", "env_x_alice", {"age": 18}, False),
id="whole-number Decimal narrows to int",
),
pytest.param(
Expand All @@ -62,7 +62,7 @@
{"trait_key": "score", "trait_value": Decimal("1.5")},
],
},
("env-key", "alice", "env_x_alice", {"score": 1.5}),
("env-key", "alice", "env_x_alice", {"score": 1.5}, False),
id="fractional Decimal narrows to float",
),
pytest.param(
Expand All @@ -82,16 +82,35 @@
"alice",
"env_x_alice",
{"plan": "growth", "team": "alpha"},
False,
),
id="multiple traits flatten to a single dict",
),
],
)
def test_map_identity_document_to_clickhouse_row__cases__return_expected(
doc: DynamoIdentity,
expected: tuple[str, str, str, dict[str, object] | None],
expected: tuple[str, str, str, dict[str, object] | None, bool],
) -> None:
# Given a Dynamo identity document
# When mapped onto an IDENTITIES row
# Then it lines up positionally with the IDENTITIES schema
assert map_identity_document_to_clickhouse_row("env-key", doc) == expected


def test_map_identity_document_to_clickhouse_row__is_deleted_true__sets_flag() -> None:
# Given a Dynamo identity document and is_deleted=True
doc: DynamoIdentity = {
"identity_uuid": UUID_A,
"identifier": "alice",
"environment_api_key": "env-key",
"composite_key": "env_x_alice",
"created_date": "2026-05-08T00:00:00Z",
"identity_traits": [],
}

# When mapped with is_deleted=True
result = map_identity_document_to_clickhouse_row("env-key", doc, is_deleted=True)

# Then the flag is set in the returned tuple
assert result == ("env-key", "alice", "env_x_alice", None, True)
Loading
Loading