-
Notifications
You must be signed in to change notification settings - Fork 728
feat: pipes and datasources for CDP aggs (CDP-804) #3714
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
4357c22
f3e7aa1
ebda0c0
99ad5ac
a075de1
0b4c11f
eea1913
a308758
76c850c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,15 @@ | ||
| SCHEMA > | ||
| `segmentId` String, | ||
| `memberId` String, | ||
| `tenantId` String, | ||
| `activityCountState` AggregateFunction(count, String), | ||
| `lastActiveState` AggregateFunction(max, DateTime64(3)), | ||
| `activityTypesState` AggregateFunction(groupArrayDistinct, String), | ||
| `activeOnState` AggregateFunction(groupArrayDistinct, String), | ||
| `averageSentimentState` AggregateFunction(avg, Int8), | ||
| `lastActivityUpdatedAtState` AggregateFunction(max, DateTime64(3)), | ||
| `updatedAt` DateTime64(3) | ||
|
|
||
| ENGINE AggregatingMergeTree | ||
| ENGINE_PARTITION_KEY toYear(updatedAt) | ||
| ENGINE_SORTING_KEY segmentId, memberId |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,16 @@ | ||
| SCHEMA > | ||
| `segmentId` String, | ||
| `organizationId` String, | ||
| `tenantId` String, | ||
| `joinedAtState` AggregateFunction(min, DateTime64(3)), | ||
| `lastActiveState` AggregateFunction(max, DateTime64(3)), | ||
| `activeOnState` AggregateFunction(groupArrayDistinct, String), | ||
| `activityCountState` AggregateFunction(count, String), | ||
| `memberCountState` AggregateFunction(countDistinct, String), | ||
| `avgContributorEngagement` AggregateFunction(avg, Int8), | ||
| `lastActivityUpdatedAtState` AggregateFunction(max, DateTime64(3)), | ||
| `updatedAt` DateTime64(3) | ||
|
|
||
| ENGINE AggregatingMergeTree | ||
| ENGINE_PARTITION_KEY toYear(updatedAt) | ||
| ENGINE_SORTING_KEY segmentId, organizationId |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,48 @@ | ||
| NODE members_with_changed_aggs_previous_day | ||
| SQL > | ||
| select distinct memberId | ||
| from cdp_member_segment_aggregates_ds | ||
| where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) | ||
|
|
||
| NODE segments_with_changed_aggs_previous_day | ||
| SQL > | ||
| select id as segmentId | ||
| from segments | ||
| where | ||
| grandparentId in ( | ||
| select grandparentId | ||
| from segments | ||
| where | ||
| id in ( | ||
| select distinct segmentId | ||
| from cdp_member_segment_aggregates_ds | ||
| where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) | ||
| ) | ||
| ) | ||
|
|
||
| NODE grandparent_segment_aggs_updated_previous_day | ||
| SQL > | ||
| SELECT | ||
| grandparentId as segmentId, | ||
| memberId, | ||
| '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, | ||
| countMerge(activityCountState) AS activityCount, | ||
| maxMerge(lastActiveState) AS lastActive, | ||
| groupArrayDistinctMerge(activityTypesState) AS activityTypes, | ||
| groupArrayDistinctMerge(activeOnState) AS activeOn, | ||
| avgMerge(averageSentimentState) AS averageSentiment, | ||
| now() AS updatedAt | ||
| FROM cdp_member_segment_aggregates_ds as cdp_aggs | ||
| join segments s on s.id = cdp_aggs.segmentId | ||
| where | ||
| cdp_aggs.segmentId in (select segmentId from segments_with_changed_aggs_previous_day) | ||
| and memberId in (select memberId from members_with_changed_aggs_previous_day) | ||
| GROUP BY grandparentId, memberId | ||
|
|
||
| TYPE SINK | ||
| EXPORT_SERVICE kafka | ||
| EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming | ||
| EXPORT_SCHEDULE 30 1 * * * | ||
| EXPORT_FORMAT csv | ||
| EXPORT_STRATEGY @new | ||
| EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| NODE leaf_segment_aggs_updated_previous_day | ||
| SQL > | ||
| % | ||
| SELECT | ||
| segmentId, | ||
| memberId, | ||
| '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, | ||
| countMerge(activityCountState) AS activityCount, | ||
| maxMerge(lastActiveState) AS lastActive, | ||
| groupArrayDistinctMerge(activityTypesState) AS activityTypes, | ||
| groupArrayDistinctMerge(activeOnState) AS activeOn, | ||
| avgMerge(averageSentimentState) AS averageSentiment, | ||
| now() as updatedAt | ||
| FROM cdp_member_segment_aggregates_ds | ||
| WHERE | ||
| (memberId, segmentId) in ( | ||
| select distinct memberId, segmentId | ||
| from cdp_member_segment_aggregates_ds | ||
| where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) | ||
| ) | ||
| GROUP BY segmentId, memberId, updatedAt | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. GROUP BY includes
|
||
|
|
||
| TYPE SINK | ||
| EXPORT_SERVICE kafka | ||
| EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming | ||
| EXPORT_SCHEDULE 0 1 * * * | ||
| EXPORT_FORMAT csv | ||
| EXPORT_STRATEGY @new | ||
| EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| NODE members_with_changed_aggs_previous_day | ||
| SQL > | ||
| select distinct memberId | ||
| from cdp_member_segment_aggregates_ds | ||
| where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) | ||
|
|
||
| NODE segments_with_changed_aggs_previous_day | ||
| SQL > | ||
| select id as segmentId | ||
| from segments | ||
| where | ||
| parentId in ( | ||
| select parentId | ||
| from segments | ||
| where | ||
| id in ( | ||
| select distinct segmentId | ||
| from cdp_member_segment_aggregates_ds | ||
| where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) | ||
| ) | ||
| ) | ||
|
|
||
| NODE cdp_member_aggregates_sink_daily_parent_segments_1 | ||
| SQL > | ||
| % | ||
| SELECT | ||
| parentId as segmentId, | ||
| memberId, | ||
| '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, | ||
| countMerge(activityCountState) AS activityCount, | ||
| maxMerge(lastActiveState) AS lastActive, | ||
| groupArrayDistinctMerge(activityTypesState) AS activityTypes, | ||
| groupArrayDistinctMerge(activeOnState) AS activeOn, | ||
| avgMerge(averageSentimentState) AS averageSentiment, | ||
| now() AS updatedAt | ||
| FROM cdp_member_segment_aggregates_ds as cdp_aggs | ||
| join segments s on s.id = cdp_aggs.segmentId | ||
| where | ||
| cdp_aggs.segmentId in (select segmentId from segments_with_changed_aggs_previous_day) | ||
| and memberId in (select memberId from members_with_changed_aggs_previous_day) | ||
| GROUP BY parentId, memberId | ||
|
|
||
| TYPE SINK | ||
| EXPORT_SERVICE kafka | ||
| EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming | ||
| EXPORT_SCHEDULE 0 1 * * * | ||
| EXPORT_FORMAT csv | ||
| EXPORT_STRATEGY @new | ||
| EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,36 @@ | ||
| NODE grandparent_segment_aggs | ||
| SQL > | ||
| % | ||
| SELECT | ||
| grandparentId as segmentId, | ||
| cdp_aggs.memberId, | ||
| '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, | ||
| countMerge(cdp_aggs.activityCountState) AS activityCount, | ||
| maxMerge(cdp_aggs.lastActiveState) AS lastActive, | ||
| groupArrayDistinctMerge(cdp_aggs.activityTypesState) AS activityTypes, | ||
| groupArrayDistinctMerge(cdp_aggs.activeOnState) AS activeOn, | ||
| avgMerge(cdp_aggs.averageSentimentState) AS averageSentiment, | ||
| now() AS updatedAt | ||
| FROM cdp_member_segment_aggregates_ds cdp_aggs | ||
| join segments s on s.id = cdp_aggs.segmentId | ||
| {% if defined(bucket_id) %} | ||
| WHERE | ||
| cityHash64(grandparentId) % 5 | ||
| = {{ | ||
| UInt8( | ||
| bucket_id, | ||
| 0, | ||
| description="This is bucket id of the activity segment", | ||
| required=False, | ||
| ) | ||
| }} | ||
| {% end %} | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Backfillers include NULL parent/grandparent IDs without bucket_idMedium Severity When Additional Locations (1) |
||
| GROUP BY grandparentId, memberId | ||
|
|
||
| TYPE SINK | ||
| EXPORT_SERVICE kafka | ||
| EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming | ||
| EXPORT_SCHEDULE @on-demand | ||
| EXPORT_FORMAT csv | ||
| EXPORT_STRATEGY @new | ||
| EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| NODE leaf_segment_aggregates | ||
| SQL > | ||
| % | ||
| SELECT | ||
| cdp_aggs.segmentId, | ||
| cdp_aggs.memberId, | ||
| '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, | ||
| countMerge(cdp_aggs.activityCountState) AS activityCount, | ||
| maxMerge(cdp_aggs.lastActiveState) AS lastActive, | ||
| groupArrayDistinctMerge(cdp_aggs.activityTypesState) AS activityTypes, | ||
| groupArrayDistinctMerge(cdp_aggs.activeOnState) AS activeOn, | ||
| avgMerge(cdp_aggs.averageSentimentState) AS averageSentiment, | ||
| now() AS updatedAt | ||
| FROM cdp_member_segment_aggregates_ds cdp_aggs | ||
| {% if defined(bucket_id) %} | ||
| WHERE | ||
| cityHash64(segmentId) % 5 | ||
| = {{ | ||
| UInt8( | ||
| bucket_id, | ||
| 0, | ||
| description="This is bucket id of the activity segment", | ||
| required=False, | ||
| ) | ||
| }} | ||
| {% end %} | ||
| GROUP BY segmentId, memberId | ||
|
|
||
| TYPE SINK | ||
| EXPORT_SERVICE kafka | ||
| EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming | ||
| EXPORT_SCHEDULE @on-demand | ||
| EXPORT_FORMAT csv | ||
| EXPORT_STRATEGY @new | ||
| EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,36 @@ | ||
| NODE parent_segment_aggregates | ||
| SQL > | ||
| % | ||
| SELECT | ||
| parentId as segmentId, | ||
| cdp_aggs.memberId, | ||
| '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, | ||
| countMerge(cdp_aggs.activityCountState) AS activityCount, | ||
| maxMerge(cdp_aggs.lastActiveState) AS lastActive, | ||
| groupArrayDistinctMerge(cdp_aggs.activityTypesState) AS activityTypes, | ||
| groupArrayDistinctMerge(cdp_aggs.activeOnState) AS activeOn, | ||
| avgMerge(cdp_aggs.averageSentimentState) AS averageSentiment, | ||
| now() AS updatedAt | ||
| FROM cdp_member_segment_aggregates_ds cdp_aggs | ||
| join segments s on s.id = cdp_aggs.segmentId | ||
| {% if defined(bucket_id) %} | ||
| WHERE | ||
| cityHash64(parentId) % 5 | ||
| = {{ | ||
| UInt8( | ||
| bucket_id, | ||
| 0, | ||
| description="This is bucket id of the activity segment", | ||
| required=False, | ||
| ) | ||
| }} | ||
| {% end %} | ||
| GROUP BY parentId, memberId | ||
|
|
||
| TYPE SINK | ||
| EXPORT_SERVICE kafka | ||
| EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming | ||
| EXPORT_SCHEDULE @on-demand | ||
| EXPORT_FORMAT csv | ||
| EXPORT_STRATEGY @new | ||
| EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| NODE cdp_member_aggregates_sink_initial_snapshot_0 | ||
| SQL > | ||
| SELECT | ||
| segmentId, | ||
| memberId, | ||
| '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, | ||
| countState(activityId) AS activityCountState, | ||
| maxState(timestamp) AS lastActiveState, | ||
| groupArrayDistinctState(type) AS activityTypesState, | ||
| groupArrayDistinctState(platform) AS activeOnState, | ||
| avgState(sentimentScore) AS averageSentimentState, | ||
| maxState(updatedAt) AS lastActivityUpdatedAtState, | ||
| now64(3) as updatedAt | ||
| FROM activityRelations_enrich_snapshot_MV_ds | ||
| GROUP BY segmentId, memberId | ||
|
|
||
| TYPE MATERIALIZED | ||
| DATASOURCE cdp_member_segment_aggregates_ds |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| NODE cdp_member_aggregates_sink_initial_snapshot_0 | ||
| SQL > | ||
| SELECT | ||
| segmentId, | ||
| memberId, | ||
| '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, | ||
| countState(activityId) AS activityCountState, | ||
| maxState(timestamp) AS lastActiveState, | ||
| groupArrayDistinctState(type) AS activityTypesState, | ||
| groupArrayDistinctState(platform) AS activeOnState, | ||
| avgState(sentimentScore) AS averageSentimentState, | ||
| maxState(act.updatedAt) as lastActivityUpdatedAtState, | ||
| max(act.updatedAt) as updatedAt | ||
| FROM activityRelations_enriched_deduplicated_ds act | ||
| GROUP BY segmentId, memberId | ||
|
|
||
| TYPE COPY | ||
| TARGET_DATASOURCE cdp_member_segment_aggregates_ds | ||
| COPY_MODE replace | ||
| COPY_SCHEDULE @on-demand |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing filter for empty parentId/grandparentId causes incorrect aggregation
High Severity
The
segmentsdatasource defaultsparentIdandgrandparentIdto empty string for top-level segments. The parent and grandparent aggregation queries don't filter out empty values, causing all segments without parents/grandparents to be aggregated together into a single row withsegmentId = ''. This combines unrelated data and produces invalid segment identifiers for Kafka export.Additional Locations (2)
services/libs/tinybird/pipes/cdp_member_aggregates_changed_parent_segments_sink.pipe#L11-L21services/libs/tinybird/pipes/cdp_member_aggregates_grandparent_segments_bucket_backfiller_sink.pipe#L15-L27