Skip to content
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
SCHEMA >
`segmentId` String,
`memberId` String,
`tenantId` String,
`activityCountState` AggregateFunction(count, String),
`lastActiveState` AggregateFunction(max, DateTime64(3)),
`activityTypesState` AggregateFunction(groupArrayDistinct, String),
`activeOnState` AggregateFunction(groupArrayDistinct, String),
`averageSentimentState` AggregateFunction(avg, Int8),
`lastActivityUpdatedAtState` AggregateFunction(max, DateTime64(3)),
`updatedAt` DateTime64(3)

ENGINE AggregatingMergeTree
ENGINE_PARTITION_KEY toYear(updatedAt)
ENGINE_SORTING_KEY segmentId, memberId
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
SCHEMA >
`segmentId` String,
`organizationId` String,
`tenantId` String,
`joinedAtState` AggregateFunction(min, DateTime64(3)),
`lastActiveState` AggregateFunction(max, DateTime64(3)),
`activeOnState` AggregateFunction(groupArrayDistinct, String),
`activityCountState` AggregateFunction(count, String),
`memberCountState` AggregateFunction(countDistinct, String),
`avgContributorEngagement` AggregateFunction(avg, Int8),
`lastActivityUpdatedAtState` AggregateFunction(max, DateTime64(3)),
`updatedAt` DateTime64(3)

ENGINE AggregatingMergeTree
ENGINE_PARTITION_KEY toYear(updatedAt)
ENGINE_SORTING_KEY segmentId, organizationId
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
NODE members_with_changed_aggs_previous_day
SQL >
select distinct memberId
from cdp_member_segment_aggregates_ds
where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY)

NODE segments_with_changed_aggs_previous_day
SQL >
select id as segmentId
from segments
where
grandparentId in (
select grandparentId
from segments
where
id in (
select distinct segmentId
from cdp_member_segment_aggregates_ds
where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY)
)
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing filter for empty parentId/grandparentId causes incorrect aggregation

High Severity

The segments datasource defaults parentId and grandparentId to empty string for top-level segments. The parent and grandparent aggregation queries don't filter out empty values, causing all segments without parents/grandparents to be aggregated together into a single row with segmentId = ''. This combines unrelated data and produces invalid segment identifiers for Kafka export.

Additional Locations (2)

Fix in Cursor Fix in Web


NODE grandparent_segment_aggs_updated_previous_day
SQL >
SELECT
grandparentId as segmentId,
memberId,
'875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId,
countMerge(activityCountState) AS activityCount,
maxMerge(lastActiveState) AS lastActive,
groupArrayDistinctMerge(activityTypesState) AS activityTypes,
groupArrayDistinctMerge(activeOnState) AS activeOn,
avgMerge(averageSentimentState) AS averageSentiment,
now() AS updatedAt
FROM cdp_member_segment_aggregates_ds as cdp_aggs
join segments s on s.id = cdp_aggs.segmentId
where
cdp_aggs.segmentId in (select segmentId from segments_with_changed_aggs_previous_day)
and memberId in (select memberId from members_with_changed_aggs_previous_day)
GROUP BY grandparentId, memberId

TYPE SINK
EXPORT_SERVICE kafka
EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming
EXPORT_SCHEDULE 30 1 * * *
EXPORT_FORMAT csv
EXPORT_STRATEGY @new
EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
NODE leaf_segment_aggs_updated_previous_day
SQL >
%
SELECT
segmentId,
memberId,
'875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId,
countMerge(activityCountState) AS activityCount,
maxMerge(lastActiveState) AS lastActive,
groupArrayDistinctMerge(activityTypesState) AS activityTypes,
groupArrayDistinctMerge(activeOnState) AS activeOn,
avgMerge(averageSentimentState) AS averageSentiment,
now() as updatedAt
FROM cdp_member_segment_aggregates_ds
WHERE
(memberId, segmentId) in (
select distinct memberId, segmentId
from cdp_member_segment_aggregates_ds
where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY)
)
GROUP BY segmentId, memberId, updatedAt
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GROUP BY includes updatedAt causing incorrect aggregation

High Severity

The GROUP BY clause includes updatedAt from the table column, which will produce multiple rows per (segmentId, memberId/organizationId) pair when there are multiple distinct updatedAt values. The parent and grandparent segment pipes correctly use GROUP BY parentId, memberId without updatedAt. The updatedAt in GROUP BY references the table column, not the now() alias, causing partial instead of full aggregations.

Additional Locations (1)

Fix in Cursor Fix in Web

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GROUP BY includes updatedAt causing incorrect aggregation

High Severity

The GROUP BY clause includes updatedAt, which prevents proper merging of aggregate states. Unlike the parent and grandparent segment pipes which group only by segmentId, memberId, this causes each distinct updatedAt value in the source table to produce a separate row with partial aggregation results. The query outputs now() as updatedAt, but the GROUP BY references the source column, resulting in multiple incorrectly aggregated rows per entity being sent to Kafka.

Additional Locations (1)

Fix in Cursor Fix in Web


TYPE SINK
EXPORT_SERVICE kafka
EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming
EXPORT_SCHEDULE 0 1 * * *
EXPORT_FORMAT csv
EXPORT_STRATEGY @new
EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
NODE members_with_changed_aggs_previous_day
SQL >
select distinct memberId
from cdp_member_segment_aggregates_ds
where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY)

NODE segments_with_changed_aggs_previous_day
SQL >
select id as segmentId
from segments
where
parentId in (
select parentId
from segments
where
id in (
select distinct segmentId
from cdp_member_segment_aggregates_ds
where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY)
)
)

NODE cdp_member_aggregates_sink_daily_parent_segments_1
SQL >
%
SELECT
parentId as segmentId,
memberId,
'875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId,
countMerge(activityCountState) AS activityCount,
maxMerge(lastActiveState) AS lastActive,
groupArrayDistinctMerge(activityTypesState) AS activityTypes,
groupArrayDistinctMerge(activeOnState) AS activeOn,
avgMerge(averageSentimentState) AS averageSentiment,
now() AS updatedAt
FROM cdp_member_segment_aggregates_ds as cdp_aggs
join segments s on s.id = cdp_aggs.segmentId
where
cdp_aggs.segmentId in (select segmentId from segments_with_changed_aggs_previous_day)
and memberId in (select memberId from members_with_changed_aggs_previous_day)
GROUP BY parentId, memberId

TYPE SINK
EXPORT_SERVICE kafka
EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming
EXPORT_SCHEDULE 0 1 * * *
EXPORT_FORMAT csv
EXPORT_STRATEGY @new
EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
NODE grandparent_segment_aggs
SQL >
%
SELECT
grandparentId as segmentId,
cdp_aggs.memberId,
'875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId,
countMerge(cdp_aggs.activityCountState) AS activityCount,
maxMerge(cdp_aggs.lastActiveState) AS lastActive,
groupArrayDistinctMerge(cdp_aggs.activityTypesState) AS activityTypes,
groupArrayDistinctMerge(cdp_aggs.activeOnState) AS activeOn,
avgMerge(cdp_aggs.averageSentimentState) AS averageSentiment,
now() AS updatedAt
FROM cdp_member_segment_aggregates_ds cdp_aggs
join segments s on s.id = cdp_aggs.segmentId
{% if defined(bucket_id) %}
WHERE
cityHash64(grandparentId) % 5
= {{
UInt8(
bucket_id,
0,
description="This is bucket id of the activity segment",
required=False,
)
}}
{% end %}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Backfillers include NULL parent/grandparent IDs without bucket_id

Medium Severity

When bucket_id is not defined, the member backfiller pipes have no WHERE clause and will include segments with NULL parentId or grandparentId, producing records with NULL segmentId. The daily changed segments sinks filter these out via their IN subqueries (since NULL doesn't match IN clauses), creating inconsistent behavior between backfill and incremental exports.

Additional Locations (1)

Fix in Cursor Fix in Web

GROUP BY grandparentId, memberId

TYPE SINK
EXPORT_SERVICE kafka
EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming
EXPORT_SCHEDULE @on-demand
EXPORT_FORMAT csv
EXPORT_STRATEGY @new
EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
NODE leaf_segment_aggregates
SQL >
%
SELECT
cdp_aggs.segmentId,
cdp_aggs.memberId,
'875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId,
countMerge(cdp_aggs.activityCountState) AS activityCount,
maxMerge(cdp_aggs.lastActiveState) AS lastActive,
groupArrayDistinctMerge(cdp_aggs.activityTypesState) AS activityTypes,
groupArrayDistinctMerge(cdp_aggs.activeOnState) AS activeOn,
avgMerge(cdp_aggs.averageSentimentState) AS averageSentiment,
now() AS updatedAt
FROM cdp_member_segment_aggregates_ds cdp_aggs
{% if defined(bucket_id) %}
WHERE
cityHash64(segmentId) % 5
= {{
UInt8(
bucket_id,
0,
description="This is bucket id of the activity segment",
required=False,
)
}}
{% end %}
GROUP BY segmentId, memberId

TYPE SINK
EXPORT_SERVICE kafka
EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming
EXPORT_SCHEDULE @on-demand
EXPORT_FORMAT csv
EXPORT_STRATEGY @new
EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
NODE parent_segment_aggregates
SQL >
%
SELECT
parentId as segmentId,
cdp_aggs.memberId,
'875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId,
countMerge(cdp_aggs.activityCountState) AS activityCount,
maxMerge(cdp_aggs.lastActiveState) AS lastActive,
groupArrayDistinctMerge(cdp_aggs.activityTypesState) AS activityTypes,
groupArrayDistinctMerge(cdp_aggs.activeOnState) AS activeOn,
avgMerge(cdp_aggs.averageSentimentState) AS averageSentiment,
now() AS updatedAt
FROM cdp_member_segment_aggregates_ds cdp_aggs
join segments s on s.id = cdp_aggs.segmentId
{% if defined(bucket_id) %}
WHERE
cityHash64(parentId) % 5
= {{
UInt8(
bucket_id,
0,
description="This is bucket id of the activity segment",
required=False,
)
}}
{% end %}
GROUP BY parentId, memberId

TYPE SINK
EXPORT_SERVICE kafka
EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming
EXPORT_SCHEDULE @on-demand
EXPORT_FORMAT csv
EXPORT_STRATEGY @new
EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink
18 changes: 18 additions & 0 deletions services/libs/tinybird/pipes/cdp_member_segment_aggregates_MV.pipe
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
NODE cdp_member_aggregates_sink_initial_snapshot_0
SQL >
SELECT
segmentId,
memberId,
'875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId,
countState(activityId) AS activityCountState,
maxState(timestamp) AS lastActiveState,
groupArrayDistinctState(type) AS activityTypesState,
groupArrayDistinctState(platform) AS activeOnState,
avgState(sentimentScore) AS averageSentimentState,
maxState(updatedAt) AS lastActivityUpdatedAtState,
now64(3) as updatedAt
FROM activityRelations_enrich_snapshot_MV_ds
GROUP BY segmentId, memberId

TYPE MATERIALIZED
DATASOURCE cdp_member_segment_aggregates_ds
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
NODE cdp_member_aggregates_sink_initial_snapshot_0
SQL >
SELECT
segmentId,
memberId,
'875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId,
countState(activityId) AS activityCountState,
maxState(timestamp) AS lastActiveState,
groupArrayDistinctState(type) AS activityTypesState,
groupArrayDistinctState(platform) AS activeOnState,
avgState(sentimentScore) AS averageSentimentState,
maxState(act.updatedAt) as lastActivityUpdatedAtState,
max(act.updatedAt) as updatedAt
FROM activityRelations_enriched_deduplicated_ds act
GROUP BY segmentId, memberId

TYPE COPY
TARGET_DATASOURCE cdp_member_segment_aggregates_ds
COPY_MODE replace
COPY_SCHEDULE @on-demand
Loading