From 4f3f144375811e4b4a5c8ff26f32051f0fe1a8d6 Mon Sep 17 00:00:00 2001 From: Nico Ritschel Date: Fri, 22 May 2026 18:22:49 -0700 Subject: [PATCH] Add semantic model and person snapshot import --- docs/import-posthog.md | 8 +- models/activity_days.yml | 183 +++++++++++ models/attribution.yml | 263 +++++++++++++++ models/events.yml | 549 +++++++++++++++++++++++++++++++ models/first_event_retention.yml | 108 ++++++ models/groups.yml | 187 +++++++++++ models/identity_links.yml | 118 +++++++ models/metrics.yml | 78 +++++ models/pageviews.yml | 142 ++++++++ models/person_profiles.yml | 126 +++++++ models/persons.yml | 137 ++++++++ models/sessions.yml | 192 +++++++++++ src/bin/import_posthog.rs | 9 +- src/importer.rs | 466 +++++++++++++++++++++++++- 14 files changed, 2545 insertions(+), 21 deletions(-) create mode 100644 models/activity_days.yml create mode 100644 models/attribution.yml create mode 100644 models/events.yml create mode 100644 models/first_event_retention.yml create mode 100644 models/groups.yml create mode 100644 models/identity_links.yml create mode 100644 models/metrics.yml create mode 100644 models/pageviews.yml create mode 100644 models/person_profiles.yml create mode 100644 models/persons.yml create mode 100644 models/sessions.yml diff --git a/docs/import-posthog.md b/docs/import-posthog.md index cafaf3f..66bca66 100644 --- a/docs/import-posthog.md +++ b/docs/import-posthog.md @@ -2,7 +2,7 @@ Hogflare includes a host-side importer for backfilling an existing PostHog project into the same Cloudflare Pipeline sink used by the Worker. It reads PostHog's private API with a personal API key, then writes normalized rows to the pipeline: -- persons as `$identify` rows +- persons as `$identify` event rows and, when configured, `PersonPipelineRecord` snapshots - groups as `$groupidentify` rows - historical events from HogQL with original `timestamp`, `created_at`, and PostHog event `uuid` when available @@ -42,7 +42,10 @@ export IMPORT_STATE_FILE=".hogflare-import-state.jsonl" export IMPORT_TARGET_ACCOUNT_ID="" export IMPORT_TARGET_BUCKET="" export IMPORT_TARGET_TABLE="default.hogflare_events_v3" +export IMPORT_PERSONS_TARGET_TABLE="default.hogflare_persons_v2" export WRANGLER_R2_SQL_AUTH_TOKEN="" +export CLOUDFLARE_PERSONS_PIPELINE_ENDPOINT="https://.ingest.cloudflare.com" +export CLOUDFLARE_PERSONS_PIPELINE_AUTH_TOKEN="" # falls back to event pipeline token export IMPORT_CLOUDFLARE_API_TOKEN="" # optional auto flush discovery export IMPORT_PIPELINE_FLUSH_SECS="300" # fallback if Pipelines read is unavailable ``` @@ -75,6 +78,8 @@ cargo run --bin import_posthog -- \ --personal-api-key "$POSTHOG_PERSONAL_API_KEY" \ --pipeline-endpoint "$CLOUDFLARE_PIPELINE_ENDPOINT" \ --pipeline-auth-token "$CLOUDFLARE_PIPELINE_AUTH_TOKEN" \ + --persons-pipeline-endpoint "$CLOUDFLARE_PERSONS_PIPELINE_ENDPOINT" \ + --persons-pipeline-auth-token "$CLOUDFLARE_PERSONS_PIPELINE_AUTH_TOKEN" \ --hogflare-api-key phc_example \ --from 2025-01-01 \ --to 2025-02-01 \ @@ -91,6 +96,7 @@ cargo run --bin import_posthog -- \ --target-account-id "$CLOUDFLARE_ACCOUNT_ID" \ --target-bucket hogflare \ --target-table default.hogflare_events_v3 \ + --persons-target-table default.hogflare_persons_v2 \ --target-auth-token "$WRANGLER_R2_SQL_AUTH_TOKEN" \ --cloudflare-api-token "$CLOUDFLARE_API_TOKEN" ``` diff --git a/models/activity_days.yml b/models/activity_days.yml new file mode 100644 index 0000000..6748111 --- /dev/null +++ b/models/activity_days.yml @@ -0,0 +1,183 @@ +models: + - name: activity_days + sql: | + with latest_profiles as ( + select * + from ( + select + *, + row_number() over (partition by person_id order by updated_at desc, version desc, created_at desc) as profile_rank + from {{ persons_table }} + where person_id is not null + ) + where profile_rank = 1 + ), + identity_map as ( + select + resolved_person_id, + canonical_distinct_id, + linked_distinct_id + from ( + select + person_id as resolved_person_id, + canonical_distinct_id, + distinct_id as linked_distinct_id, + row_number() over (partition by distinct_id order by updated_at desc, version desc, created_at desc) as identity_rank + from latest_profiles, + unnest(distinct_ids) as identity_distinct_ids(distinct_id) + where distinct_id is not null + ) + where identity_rank = 1 + ), + event_base as ( + select + uuid, + event as event_type, + distinct_id as actor_id, + person_id, + coalesce(person_id, identity_map.resolved_person_id) as resolved_person_id, + identity_map.canonical_distinct_id, + coalesce(timestamp, created_at) as event_time, + date_trunc('day', coalesce(timestamp, created_at)) as activity_day, + properties, + api_key, + coalesce(json_extract_string(properties, '$.$session_id'), json_extract_string(properties, '$.session_id')) as session_id, + coalesce(json_extract_string(properties, '$.$pathname'), json_extract_string(properties, '$.pathname'), json_extract_string(properties, '$.path')) as pathname, + coalesce(json_extract_string(properties, '$.$current_url'), json_extract_string(properties, '$.current_url'), json_extract_string(properties, '$.url')) as current_url, + coalesce(json_extract_string(properties, '$.$referrer'), json_extract_string(properties, '$.referrer')) as referrer, + coalesce(json_extract_string(properties, '$.$utm_source'), json_extract_string(properties, '$.utm_source'), json_extract_string(properties, '$.$initial_utm_source')) as utm_source, + coalesce(json_extract_string(properties, '$.$utm_medium'), json_extract_string(properties, '$.utm_medium'), json_extract_string(properties, '$.$initial_utm_medium')) as utm_medium, + coalesce(json_extract_string(properties, '$.$utm_campaign'), json_extract_string(properties, '$.utm_campaign'), json_extract_string(properties, '$.$initial_utm_campaign')) as utm_campaign, + coalesce(json_extract_string(properties, '$.$browser'), json_extract_string(properties, '$.browser')) as browser, + coalesce(json_extract_string(properties, '$.$os'), json_extract_string(properties, '$.os')) as os, + coalesce(json_extract_string(properties, '$.$device_type'), json_extract_string(properties, '$.device_type')) as device_type, + coalesce(json_extract_string(properties, '$.$geoip_country_code'), json_extract_string(properties, '$.country')) as geo_country_code + from {{ events_table }} + left join identity_map on distinct_id = identity_map.linked_distinct_id + where coalesce(timestamp, created_at) is not null + ), + fallback_events as ( + select * + from event_base + where session_id is null + ) + select + actor_id || ':' || strftime(activity_day, '%Y-%m-%d') as activity_day_key, + actor_id, + max(resolved_person_id) as person_id, + max(canonical_distinct_id) as canonical_distinct_id, + first(api_key order by event_time asc) as api_key, + activity_day, + min(event_time) as first_activity_at, + max(event_time) as last_activity_at, + date_diff('second', min(event_time), max(event_time)) as duration_seconds, + count(*) as event_count, + sum(case when event_type in ('$pageview', 'page_view', '$screen', 'screen') then 1 else 0 end) as pageview_count, + sum(case when event_type = '$autocapture' then 1 else 0 end) as autocapture_count, + first(pathname order by event_time asc) as first_path, + last(pathname order by event_time asc) as last_path, + first(current_url order by event_time asc) as first_url, + last(current_url order by event_time asc) as last_url, + first(referrer order by event_time asc) as referrer, + first(utm_source order by event_time asc) as utm_source, + first(utm_medium order by event_time asc) as utm_medium, + first(utm_campaign order by event_time asc) as utm_campaign, + first(browser order by event_time asc) as browser, + first(os order by event_time asc) as os, + first(device_type order by event_time asc) as device_type, + first(geo_country_code order by event_time asc) as geo_country_code + from fallback_events + group by actor_id, activity_day + description: Actor/day fallback activity rollup for events that did not carry a PostHog $session_id. + primary_key: activity_day_key + default_time_dimension: activity_day + default_grain: day + + relationships: + - name: persons + type: many_to_one + foreign_key: actor_id + primary_key: actor_id + + dimensions: + - name: activity_day_key + type: categorical + - name: actor_id + type: categorical + - name: person_id + type: categorical + - name: canonical_distinct_id + type: categorical + - name: api_key + type: categorical + - name: first_path + type: categorical + - name: last_path + type: categorical + - name: first_url + type: categorical + - name: last_url + type: categorical + - name: referrer + type: categorical + - name: utm_source + type: categorical + - name: utm_medium + type: categorical + - name: utm_campaign + type: categorical + - name: browser + type: categorical + - name: os + type: categorical + - name: device_type + type: categorical + - name: geo_country_code + type: categorical + - name: duration_seconds + type: numeric + - name: event_count + type: numeric + - name: pageview_count + type: numeric + - name: activity_day + type: time + granularity: day + - name: first_activity_at + type: time + granularity: day + - name: last_activity_at + type: time + granularity: day + + metrics: + - name: activity_day_count + agg: count + description: Actor/day fallback rows for events without SDK session ids. + - name: active_users + agg: count_distinct + sql: actor_id + description: Distinct actors with fallback activity days. + - name: fallback_events + agg: sum + sql: event_count + description: Events included in actor/day fallback rollups. + - name: fallback_pageviews + agg: sum + sql: pageview_count + description: Pageviews included in actor/day fallback rollups. + - name: bounced_activity_days + agg: count + filters: + - pageview_count <= 1 + description: Fallback actor/day rows with one or fewer pageviews. + - name: engaged_activity_days + agg: count + filters: + - event_count > 1 or duration_seconds >= 10 + description: Fallback actor/day rows with multiple events or at least 10 seconds duration. + - name: total_activity_seconds + agg: sum + sql: duration_seconds + description: Total duration across fallback actor/day rollups. + diff --git a/models/attribution.yml b/models/attribution.yml new file mode 100644 index 0000000..e7a97df --- /dev/null +++ b/models/attribution.yml @@ -0,0 +1,263 @@ +models: + - name: attribution + sql: | + with latest_profiles as ( + select * + from ( + select + *, + row_number() over (partition by person_id order by updated_at desc, version desc, created_at desc) as profile_rank + from {{ persons_table }} + where person_id is not null + ) + where profile_rank = 1 + ), + identity_map as ( + select + resolved_person_id, + canonical_distinct_id, + linked_distinct_id + from ( + select + person_id as resolved_person_id, + canonical_distinct_id, + distinct_id as linked_distinct_id, + row_number() over (partition by distinct_id order by updated_at desc, version desc, created_at desc) as identity_rank + from latest_profiles, + unnest(distinct_ids) as identity_distinct_ids(distinct_id) + where distinct_id is not null + ) + where identity_rank = 1 + ), + event_base as ( + select + uuid, + event as event_type, + distinct_id as actor_id, + person_id, + coalesce(person_id, identity_map.resolved_person_id) as resolved_person_id, + identity_map.canonical_distinct_id, + coalesce(timestamp, created_at) as event_time, + api_key, + coalesce( + json_extract_string(properties, '$.$current_url'), + json_extract_string(properties, '$.current_url'), + json_extract_string(properties, '$.url') + ) as current_url, + coalesce( + json_extract_string(properties, '$.$referrer'), + json_extract_string(properties, '$.referrer') + ) as referrer, + coalesce( + json_extract_string(properties, '$.$initial_referrer'), + json_extract_string(properties, '$.initial_referrer') + ) as initial_referrer, + coalesce( + json_extract_string(properties, '$.$utm_source'), + json_extract_string(properties, '$.utm_source') + ) as utm_source, + coalesce( + json_extract_string(properties, '$.$utm_medium'), + json_extract_string(properties, '$.utm_medium') + ) as utm_medium, + coalesce( + json_extract_string(properties, '$.$utm_campaign'), + json_extract_string(properties, '$.utm_campaign') + ) as utm_campaign, + coalesce( + json_extract_string(properties, '$.$utm_content'), + json_extract_string(properties, '$.utm_content') + ) as utm_content, + coalesce( + json_extract_string(properties, '$.$utm_term'), + json_extract_string(properties, '$.utm_term') + ) as utm_term, + coalesce( + json_extract_string(properties, '$.$initial_utm_source'), + json_extract_string(properties, '$.initial_utm_source') + ) as initial_utm_source, + coalesce( + json_extract_string(properties, '$.$initial_utm_medium'), + json_extract_string(properties, '$.initial_utm_medium') + ) as initial_utm_medium, + coalesce( + json_extract_string(properties, '$.$initial_utm_campaign'), + json_extract_string(properties, '$.initial_utm_campaign') + ) as initial_utm_campaign, + coalesce( + json_extract_string(properties, '$.$initial_utm_content'), + json_extract_string(properties, '$.initial_utm_content') + ) as initial_utm_content, + coalesce( + json_extract_string(properties, '$.$initial_utm_term'), + json_extract_string(properties, '$.initial_utm_term') + ) as initial_utm_term + from {{ events_table }} + left join identity_map on distinct_id = identity_map.linked_distinct_id + where coalesce(timestamp, created_at) is not null + ), + touches as ( + select + *, + coalesce(utm_source, initial_utm_source) as touch_utm_source, + coalesce(utm_medium, initial_utm_medium) as touch_utm_medium, + coalesce(utm_campaign, initial_utm_campaign) as touch_utm_campaign, + coalesce(utm_content, initial_utm_content) as touch_utm_content, + coalesce(utm_term, initial_utm_term) as touch_utm_term, + coalesce(referrer, initial_referrer) as touch_referrer, + ( + utm_source is not null + or utm_medium is not null + or utm_campaign is not null + or utm_content is not null + or utm_term is not null + or initial_utm_source is not null + or initial_utm_medium is not null + or initial_utm_campaign is not null + or initial_utm_content is not null + or initial_utm_term is not null + or referrer is not null + or initial_referrer is not null + ) as has_attribution + from event_base + ) + select + actor_id as attribution_id, + actor_id, + max(resolved_person_id) as person_id, + max(canonical_distinct_id) as canonical_distinct_id, + first(api_key order by event_time asc) as api_key, + min(event_time) as first_seen_at, + max(event_time) as last_seen_at, + min(event_time) filter (where has_attribution) as first_touch_at, + max(event_time) filter (where has_attribution) as last_touch_at, + arg_min(current_url, event_time) filter (where current_url is not null) as first_url, + arg_max(current_url, event_time) filter (where current_url is not null) as last_url, + arg_min(touch_referrer, event_time) filter (where touch_referrer is not null) as first_referrer, + arg_max(touch_referrer, event_time) filter (where touch_referrer is not null) as last_referrer, + arg_min(touch_utm_source, event_time) filter (where touch_utm_source is not null) as first_utm_source, + arg_max(touch_utm_source, event_time) filter (where touch_utm_source is not null) as last_utm_source, + arg_min(touch_utm_medium, event_time) filter (where touch_utm_medium is not null) as first_utm_medium, + arg_max(touch_utm_medium, event_time) filter (where touch_utm_medium is not null) as last_utm_medium, + arg_min(touch_utm_campaign, event_time) filter (where touch_utm_campaign is not null) as first_utm_campaign, + arg_max(touch_utm_campaign, event_time) filter (where touch_utm_campaign is not null) as last_utm_campaign, + arg_min(touch_utm_content, event_time) filter (where touch_utm_content is not null) as first_utm_content, + arg_max(touch_utm_content, event_time) filter (where touch_utm_content is not null) as last_utm_content, + arg_min(touch_utm_term, event_time) filter (where touch_utm_term is not null) as first_utm_term, + arg_max(touch_utm_term, event_time) filter (where touch_utm_term is not null) as last_utm_term, + max(has_attribution) as has_attribution, + max(touch_utm_source is not null or touch_utm_medium is not null or touch_utm_campaign is not null or touch_utm_content is not null or touch_utm_term is not null) as has_utm, + max(touch_referrer is not null) as has_referrer, + max(current_url is not null) as has_current_url, + count(*) as event_count, + count(*) filter (where has_attribution) as attribution_event_count + from touches + group by actor_id + description: First-touch and last-touch attribution by actor using PostHog UTM, initial UTM, referrer, and current URL properties. + primary_key: attribution_id + default_time_dimension: first_seen_at + default_grain: day + + relationships: + - name: persons + type: many_to_one + foreign_key: actor_id + primary_key: actor_id + - name: person_profiles + type: many_to_one + foreign_key: person_id + primary_key: person_id + + dimensions: + - name: attribution_id + type: categorical + - name: actor_id + type: categorical + - name: person_id + type: categorical + - name: canonical_distinct_id + type: categorical + - name: api_key + type: categorical + - name: first_url + type: categorical + - name: last_url + type: categorical + - name: first_referrer + type: categorical + - name: last_referrer + type: categorical + - name: first_utm_source + type: categorical + - name: last_utm_source + type: categorical + - name: first_utm_medium + type: categorical + - name: last_utm_medium + type: categorical + - name: first_utm_campaign + type: categorical + - name: last_utm_campaign + type: categorical + - name: first_utm_content + type: categorical + - name: last_utm_content + type: categorical + - name: first_utm_term + type: categorical + - name: last_utm_term + type: categorical + - name: has_attribution + type: boolean + - name: has_utm + type: boolean + - name: has_referrer + type: boolean + - name: has_current_url + type: boolean + - name: event_count + type: numeric + - name: attribution_event_count + type: numeric + - name: first_seen_at + type: time + granularity: day + - name: last_seen_at + type: time + granularity: day + - name: first_touch_at + type: time + granularity: day + - name: last_touch_at + type: time + granularity: day + + metrics: + - name: actor_count + agg: count + description: Actors included in attribution rollups. + - name: attributed_actors + agg: count + filters: + - has_attribution + description: Actors with at least one UTM, initial UTM, or referrer touch. + - name: actors_with_utm + agg: count + filters: + - has_utm + description: Actors with UTM or initial UTM properties. + - name: actors_with_referrer + agg: count + filters: + - has_referrer + description: Actors with referrer or initial referrer properties. + - name: actors_with_current_url + agg: count + filters: + - has_current_url + description: Actors with at least one current URL. + - name: attribution_events + agg: sum + sql: attribution_event_count + description: Events carrying UTM, initial UTM, or referrer attribution signals. + diff --git a/models/events.yml b/models/events.yml new file mode 100644 index 0000000..a2b0587 --- /dev/null +++ b/models/events.yml @@ -0,0 +1,549 @@ +models: + - name: events + sql: | + with latest_profiles as ( + select * + from ( + select + *, + row_number() over (partition by person_id order by updated_at desc, version desc, created_at desc) as profile_rank + from {{ persons_table }} + where person_id is not null + ) + where profile_rank = 1 + ), + identity_map as ( + select + resolved_person_id, + canonical_distinct_id, + linked_distinct_id + from ( + select + person_id as resolved_person_id, + canonical_distinct_id, + distinct_id as linked_distinct_id, + row_number() over (partition by distinct_id order by updated_at desc, version desc, created_at desc) as identity_rank + from latest_profiles, + unnest(distinct_ids) as identity_distinct_ids(distinct_id) + where distinct_id is not null + ) + where identity_rank = 1 + ), + source as ( + select + uuid, + team_id, + source, + event as event_type, + distinct_id, + distinct_id as actor_id, + coalesce(person_id, identity_map.resolved_person_id, distinct_id) as identity_id, + coalesce(person_id, identity_map.resolved_person_id) as resolved_person_id, + identity_map.canonical_distinct_id, + created_at, + timestamp, + coalesce(timestamp, created_at) as event_time, + properties, + context, + person_id, + person_created_at, + person_properties, + group0, + group1, + group2, + group3, + group4, + group_properties, + api_key, + extra, + coalesce( + json_extract_string(properties, '$.$session_id'), + json_extract_string(properties, '$.session_id'), + json_extract_string(context, '$.session_id') + ) as session_id, + coalesce( + json_extract_string(properties, '$.$window_id'), + json_extract_string(properties, '$.window_id') + ) as window_id, + coalesce( + json_extract_string(properties, '$.$current_url'), + json_extract_string(properties, '$.current_url'), + json_extract_string(properties, '$.url') + ) as current_url, + coalesce( + json_extract_string(properties, '$.$pathname'), + json_extract_string(properties, '$.pathname'), + json_extract_string(properties, '$.path') + ) as pathname, + coalesce( + json_extract_string(properties, '$.$host'), + json_extract_string(properties, '$.host') + ) as host, + coalesce( + json_extract_string(properties, '$.$title'), + json_extract_string(properties, '$.title') + ) as page_title, + coalesce( + json_extract_string(properties, '$.$referrer'), + json_extract_string(properties, '$.referrer') + ) as referrer, + coalesce( + json_extract_string(properties, '$.$utm_source'), + json_extract_string(properties, '$.utm_source'), + json_extract_string(properties, '$.$initial_utm_source') + ) as utm_source, + coalesce( + json_extract_string(properties, '$.$utm_medium'), + json_extract_string(properties, '$.utm_medium'), + json_extract_string(properties, '$.$initial_utm_medium') + ) as utm_medium, + coalesce( + json_extract_string(properties, '$.$utm_campaign'), + json_extract_string(properties, '$.utm_campaign'), + json_extract_string(properties, '$.$initial_utm_campaign') + ) as utm_campaign, + coalesce( + json_extract_string(properties, '$.$utm_content'), + json_extract_string(properties, '$.utm_content'), + json_extract_string(properties, '$.$initial_utm_content') + ) as utm_content, + coalesce( + json_extract_string(properties, '$.$utm_term'), + json_extract_string(properties, '$.utm_term'), + json_extract_string(properties, '$.$initial_utm_term') + ) as utm_term, + coalesce( + json_extract_string(properties, '$.$browser'), + json_extract_string(properties, '$.browser') + ) as browser, + coalesce( + json_extract_string(properties, '$.$browser_version'), + json_extract_string(properties, '$.browser_version') + ) as browser_version, + coalesce( + json_extract_string(properties, '$.$os'), + json_extract_string(properties, '$.os') + ) as os, + coalesce( + json_extract_string(properties, '$.$os_version'), + json_extract_string(properties, '$.os_version') + ) as os_version, + coalesce( + json_extract_string(properties, '$.$device_type'), + json_extract_string(properties, '$.device_type') + ) as device_type, + coalesce( + json_extract_string(properties, '$.$lib'), + json_extract_string(properties, '$.library'), + json_extract_string(extra, '$.library') + ) as library, + coalesce( + json_extract_string(properties, '$.$lib_version'), + json_extract_string(properties, '$.library_version') + ) as library_version, + coalesce( + json_extract_string(properties, '$.$geoip_country_code'), + json_extract_string(properties, '$.country') + ) as geo_country_code, + coalesce( + json_extract_string(properties, '$.$geoip_subdivision_1_code'), + json_extract_string(properties, '$.region') + ) as geo_region, + coalesce( + json_extract_string(properties, '$.$geoip_city_name'), + json_extract_string(properties, '$.city') + ) as geo_city, + json_extract_string(properties, '$.$geoip_time_zone') as geo_timezone, + json_extract_string(properties, '$.cf_colo') as cf_colo, + try_cast(json_extract_string(properties, '$.cf_asn') as bigint) as cf_asn, + coalesce( + json_extract_string(extra, '$.group_type'), + json_extract_string(properties, '$.group_type') + ) as group_type, + coalesce( + json_extract_string(extra, '$.group_key'), + json_extract_string(properties, '$.group_key') + ) as group_key, + json_extract_string(extra, '$.alias') as alias, + try_cast(json_extract_string(extra, '$.$sent_at') as timestamp) as sent_at, + coalesce( + json_extract_string(properties, '$.plan'), + json_extract_string(person_properties, '$.plan') + ) as plan, + coalesce( + json_extract_string(properties, '$.email'), + json_extract_string(person_properties, '$.email') + ) as email, + nullif(regexp_extract( + coalesce( + json_extract_string(properties, '$.email'), + json_extract_string(person_properties, '$.email') + ), + '@([^@]+)$', + 1 + ), '') as email_domain + from {{ events_table }} + left join identity_map on distinct_id = identity_map.linked_distinct_id + ) + select + *, + coalesce(session_id, actor_id || ':' || strftime(event_time, '%Y-%m-%d')) as session_key, + event_type not in ('$identify', '$groupidentify', '$create_alias', '$engage', '$snapshot') as is_capture_event, + event_type in ('$pageview', 'page_view', '$screen', 'screen') as is_pageview_event, + event_type in ('$identify', '$engage') as is_person_mutation_event, + event_type = '$groupidentify' as is_group_event, + event_type = '$snapshot' as is_session_recording_event, + resolved_person_id is not null as has_person, + group0 is not null or group1 is not null or group2 is not null or group3 is not null or group4 is not null as has_group, + session_id is not null as has_session_id + from source + description: Canonical Hogflare event fact model over the Cloudflare Pipeline Iceberg table. + primary_key: uuid + default_time_dimension: event_time + default_grain: day + metadata: + source_table_parameter: events_table + persons_table_parameter: persons_table + + relationships: + - name: persons + type: many_to_one + foreign_key: actor_id + primary_key: actor_id + - name: sessions + type: many_to_one + foreign_key: session_id + primary_key: session_id + - name: activity_days + type: many_to_one + foreign_key: session_key + primary_key: activity_day_key + - name: person_profiles + type: many_to_one + foreign_key: resolved_person_id + primary_key: person_id + - name: identity_links + type: many_to_one + foreign_key: actor_id + primary_key: distinct_id + + dimensions: + - name: uuid + type: categorical + description: Unique pipeline event id. + - name: event_type + type: categorical + description: PostHog event name. + - name: source + type: categorical + description: Event source assigned by Hogflare. + - name: team_id + type: numeric + description: Optional PostHog team id assigned by the Worker. + - name: api_key + type: categorical + description: PostHog project API key. + - name: distinct_id + type: categorical + description: Original PostHog distinct id. + - name: actor_id + type: categorical + description: PostHog distinct id used as the stable journey entity for funnels, sessions, retention, and cohorts. + - name: identity_id + type: categorical + description: Resolved person id when available, otherwise distinct id. + - name: resolved_person_id + type: categorical + description: Resolved Hogflare person id from the event row or the person distinct-id bridge. + - name: canonical_distinct_id + type: categorical + description: Canonical PostHog distinct id from the person distinct-id bridge when available. + - name: person_id + type: categorical + description: Stable Hogflare person id. + - name: session_id + type: categorical + description: SDK session id from event properties. + - name: session_key + type: categorical + description: Session id when present, otherwise actor/day fallback for session-style analysis. + - name: window_id + type: categorical + description: SDK window id from event properties. + - name: current_url + type: categorical + description: Browser current URL. + - name: pathname + type: categorical + description: Browser path. + - name: host + type: categorical + description: Browser host. + - name: page_title + type: categorical + description: Browser page title. + - name: referrer + type: categorical + description: Browser referrer. + - name: utm_source + type: categorical + description: UTM source. + - name: utm_medium + type: categorical + description: UTM medium. + - name: utm_campaign + type: categorical + description: UTM campaign. + - name: utm_content + type: categorical + description: UTM content. + - name: utm_term + type: categorical + description: UTM term. + - name: browser + type: categorical + description: Browser name. + - name: browser_version + type: categorical + description: Browser version. + - name: os + type: categorical + description: Operating system. + - name: os_version + type: categorical + description: Operating system version. + - name: device_type + type: categorical + description: Device type. + - name: library + type: categorical + description: SDK/library name. + - name: library_version + type: categorical + description: SDK/library version. + - name: geo_country_code + type: categorical + description: Country code from Cloudflare enrichment or event properties. + - name: geo_region + type: categorical + description: Region/subdivision from Cloudflare enrichment or event properties. + - name: geo_city + type: categorical + description: City from Cloudflare enrichment or event properties. + - name: geo_timezone + type: categorical + description: Timezone from Cloudflare enrichment. + - name: cf_colo + type: categorical + description: Cloudflare colo. + - name: cf_asn + type: numeric + description: Cloudflare autonomous system number. + - name: group0 + type: categorical + description: First configured group key. + - name: group1 + type: categorical + description: Second configured group key. + - name: group2 + type: categorical + description: Third configured group key. + - name: group3 + type: categorical + description: Fourth configured group key. + - name: group4 + type: categorical + description: Fifth configured group key. + - name: group_type + type: categorical + description: Group type carried by group identify events. + - name: group_key + type: categorical + description: Group key carried by group identify events. + - name: alias + type: categorical + description: Alias id carried by create-alias events. + - name: plan + type: categorical + description: Common person or event plan property. + - name: email_domain + type: categorical + description: Domain extracted from a common email property. + - name: is_capture_event + type: boolean + description: True for non-system capture events. + - name: is_pageview_event + type: boolean + description: True for pageview or screen events. + - name: is_person_mutation_event + type: boolean + description: True for identify or engage events. + - name: is_group_event + type: boolean + description: True for group identify events. + - name: is_session_recording_event + type: boolean + description: True for session recording snapshot events. + - name: has_person + type: boolean + description: True when Hogflare resolved a person id. + - name: has_group + type: boolean + description: True when any configured group slot is populated. + - name: has_session_id + type: boolean + description: True when an SDK session id exists. + - name: created_at + type: time + granularity: day + description: Timestamp when Hogflare created the pipeline event. + - name: timestamp + type: time + granularity: day + description: Client-supplied event timestamp when present. + - name: person_created_at + type: time + granularity: day + description: First-seen timestamp for the resolved person record. + - name: sent_at + type: time + granularity: day + description: SDK batch sent-at timestamp when present. + - name: event_time + type: time + granularity: day + description: Client timestamp when present, otherwise ingestion timestamp. + + metrics: + - name: event_count + agg: count + description: Total number of events. + - name: capture_events + agg: count + filters: + - is_capture_event + description: Count of non-system capture events. + - name: custom_events + agg: count + filters: + - is_capture_event and not is_pageview_event + description: Count of non-system, non-pageview events. + - name: unique_users + agg: count_distinct + sql: actor_id + description: Distinct actor ids. + - name: known_persons + agg: count_distinct + sql: resolved_person_id + description: Distinct resolved Hogflare person ids from event rows or identity links. + - name: anonymous_users + agg: count_distinct + sql: actor_id + filters: + - resolved_person_id is null + description: Distinct actors without a resolved person id. + - name: unique_sessions + agg: count_distinct + sql: session_key + description: Distinct SDK sessions, falling back to actor/day. + - name: pageviews + agg: count + filters: + - is_pageview_event + description: Pageview and screen events. + - name: autocaptures + agg: count + filters: + - "event_type = '$autocapture'" + description: Autocapture events. + - name: identify_events + agg: count + filters: + - "event_type = '$identify'" + description: Identify events. + - name: group_identify_events + agg: count + filters: + - "event_type = '$groupidentify'" + description: Group identify events. + - name: alias_events + agg: count + filters: + - "event_type = '$create_alias'" + description: Alias creation events. + - name: engage_events + agg: count + filters: + - "event_type = '$engage'" + description: Engage/person mutation events. + - name: snapshot_events + agg: count + filters: + - "event_type = '$snapshot'" + description: Session recording snapshot events. + - name: grouped_events + agg: count + filters: + - has_group + description: Events linked to at least one configured group key. + - name: events_with_properties + agg: count + filters: + - properties is not null + description: Events with a properties JSON payload. + - name: events_with_context + agg: count + filters: + - context is not null + description: Events with a context JSON payload. + - name: unique_groups + agg: count_distinct + sql: coalesce(group0, group1, group2, group3, group4) + description: Distinct group keys across populated group slots. + - name: unique_api_keys + agg: count_distinct + sql: api_key + description: Distinct PostHog project API keys. + - name: power_users + type: cohort + entity: actor_id + inner_metrics: + - name: user_events + agg: count + having: user_events >= 10 + agg: count + description: Count of actors with at least 10 events in the query scope. + - name: multi_session_users + type: cohort + entity: actor_id + inner_metrics: + - name: session_count + agg: count_distinct + sql: session_key + having: session_count >= 2 + agg: count + description: Count of actors with at least two sessions in the query scope. + + segments: + - name: capture_events + sql: is_capture_event + description: Non-system capture events. + - name: pageviews + sql: is_pageview_event + description: Pageview and screen events. + - name: custom_events + sql: is_capture_event and not is_pageview_event + description: Non-system, non-pageview events. + - name: identified + sql: resolved_person_id is not null + description: Events linked to a resolved person. + - name: anonymous + sql: resolved_person_id is null + description: Events not linked to a resolved person. + - name: grouped + sql: has_group + description: Events linked to a group. + - name: session_recordings + sql: is_session_recording_event + description: Session recording snapshot events. + diff --git a/models/first_event_retention.yml b/models/first_event_retention.yml new file mode 100644 index 0000000..2ec3afe --- /dev/null +++ b/models/first_event_retention.yml @@ -0,0 +1,108 @@ +models: + - name: first_event_retention + sql: | + with event_base as ( + select + distinct_id as actor_id, + date_trunc('day', coalesce(timestamp, created_at)) as event_day + from {{ events_table }} + where coalesce(timestamp, created_at) is not null + ), + cohorts as ( + select + actor_id, + min(event_day) as cohort_day + from event_base + group by actor_id + ), + activity_days as ( + select + actor_id, + event_day as activity_day + from event_base + group by actor_id, event_day + ), + cohort_sizes as ( + select + cohort_day, + count(*) as cohort_size + from cohorts + group by cohort_day + ), + retained as ( + select + cohorts.cohort_day, + activity_days.activity_day, + date_diff('day', cohorts.cohort_day, activity_days.activity_day) as period_day, + count(activity_days.actor_id) as active_users + from cohorts + join activity_days on cohorts.actor_id = activity_days.actor_id + where activity_days.activity_day >= cohorts.cohort_day + group by cohorts.cohort_day, activity_days.activity_day + ) + select + cast(retained.cohort_day as varchar) || ':' || cast(retained.period_day as varchar) as retention_id, + retained.cohort_day, + retained.activity_day, + retained.period_day, + retained.active_users, + cohort_sizes.cohort_size, + cast(retained.active_users as double) / nullif(cohort_sizes.cohort_size, 0) as retention_rate + from retained + join cohort_sizes on retained.cohort_day = cohort_sizes.cohort_day + where retained.period_day between 0 and 30 + description: Daily first-event retention, implemented with R2 SQL-supported grouping instead of SELECT DISTINCT. + primary_key: retention_id + default_time_dimension: cohort_day + default_grain: day + + dimensions: + - name: retention_id + type: categorical + - name: period_day + type: numeric + - name: active_users + type: numeric + - name: cohort_size + type: numeric + - name: retention_rate + type: numeric + - name: cohort_day + type: time + granularity: day + - name: activity_day + type: time + granularity: day + + metrics: + - name: cohort_users + agg: sum + sql: cohort_size + filters: + - period_day = 0 + description: Users in the original cohort, counted once per cohort. + - name: day_0_users + agg: sum + sql: active_users + filters: + - period_day = 0 + description: Active users on the cohort day. + - name: day_1_retained_users + agg: sum + sql: active_users + filters: + - period_day = 1 + description: Active users one day after first event. + - name: day_7_retained_users + agg: sum + sql: active_users + filters: + - period_day = 7 + description: Active users seven days after first event. + - name: day_30_retained_users + agg: sum + sql: active_users + filters: + - period_day = 30 + description: Active users thirty days after first event. + diff --git a/models/groups.yml b/models/groups.yml new file mode 100644 index 0000000..5bef450 --- /dev/null +++ b/models/groups.yml @@ -0,0 +1,187 @@ +models: + - name: groups + sql: | + with event_base as ( + select + uuid, + event as event_type, + distinct_id as actor_id, + coalesce(timestamp, created_at) as event_time, + group0, + group1, + group2, + group3, + group4, + group_properties, + extra, + api_key + from {{ events_table }} + where coalesce(timestamp, created_at) is not null + ), + event_groups as ( + select + uuid || ':group0:' || group0 as group_event_id, + uuid, + event_type, + actor_id, + event_time, + api_key, + 'group0' as group_slot, + 0 as group_slot_index, + 'company' as configured_group_type, + group0 as group_key, + json_extract_string(extra, '$.group_type') as posthog_group_type, + json_extract_string(extra, '$.group_key') as posthog_group_key, + group_properties + from event_base + where group0 is not null + union all + select + uuid || ':group1:' || group1 as group_event_id, + uuid, + event_type, + actor_id, + event_time, + api_key, + 'group1' as group_slot, + 1 as group_slot_index, + 'team' as configured_group_type, + group1 as group_key, + json_extract_string(extra, '$.group_type') as posthog_group_type, + json_extract_string(extra, '$.group_key') as posthog_group_key, + group_properties + from event_base + where group1 is not null + union all + select + uuid || ':group2:' || group2 as group_event_id, + uuid, + event_type, + actor_id, + event_time, + api_key, + 'group2' as group_slot, + 2 as group_slot_index, + 'project' as configured_group_type, + group2 as group_key, + json_extract_string(extra, '$.group_type') as posthog_group_type, + json_extract_string(extra, '$.group_key') as posthog_group_key, + group_properties + from event_base + where group2 is not null + union all + select + uuid || ':group3:' || group3 as group_event_id, + uuid, + event_type, + actor_id, + event_time, + api_key, + 'group3' as group_slot, + 3 as group_slot_index, + 'org' as configured_group_type, + group3 as group_key, + json_extract_string(extra, '$.group_type') as posthog_group_type, + json_extract_string(extra, '$.group_key') as posthog_group_key, + group_properties + from event_base + where group3 is not null + union all + select + uuid || ':group4:' || group4 as group_event_id, + uuid, + event_type, + actor_id, + event_time, + api_key, + 'group4' as group_slot, + 4 as group_slot_index, + 'workspace' as configured_group_type, + group4 as group_key, + json_extract_string(extra, '$.group_type') as posthog_group_type, + json_extract_string(extra, '$.group_key') as posthog_group_key, + group_properties + from event_base + where group4 is not null + ) + select + group_event_id, + uuid, + event_type, + actor_id, + event_time, + api_key, + group_slot, + group_slot_index, + configured_group_type, + coalesce(posthog_group_type, configured_group_type) as group_type, + posthog_group_type, + group_key, + posthog_group_key, + group_properties + from event_groups + description: One row per grouped event per populated PostHog group slot. + primary_key: group_event_id + default_time_dimension: event_time + default_grain: day + + relationships: + - name: events + type: many_to_one + foreign_key: uuid + primary_key: uuid + + dimensions: + - name: group_event_id + type: categorical + - name: uuid + type: categorical + - name: event_type + type: categorical + - name: actor_id + type: categorical + - name: api_key + type: categorical + - name: group_slot + type: categorical + - name: group_slot_index + type: numeric + - name: configured_group_type + type: categorical + - name: group_key + type: categorical + - name: group_type + type: categorical + - name: posthog_group_type + type: categorical + - name: posthog_group_key + type: categorical + - name: event_time + type: time + granularity: day + + metrics: + - name: group_event_count + agg: count + description: Group-slot event rows. + - name: unique_groups + agg: count_distinct + sql: group_key + description: Distinct group keys. + - name: unique_group_slots + agg: count_distinct + sql: group_slot + description: Populated PostHog group slots. + - name: unique_group_types + agg: count_distinct + sql: group_type + description: Distinct configured or event-supplied group types. + - name: unique_grouped_users + agg: count_distinct + sql: actor_id + description: Distinct actors linked to groups. + - name: group_identify_events + agg: count + filters: + - "event_type = '$groupidentify'" + description: Group identify events. diff --git a/models/identity_links.yml b/models/identity_links.yml new file mode 100644 index 0000000..9b0c058 --- /dev/null +++ b/models/identity_links.yml @@ -0,0 +1,118 @@ +models: + - name: identity_links + sql: | + with latest_profiles as ( + select * + from ( + select + *, + row_number() over (partition by person_id order by updated_at desc, version desc, created_at desc) as profile_rank + from {{ persons_table }} + where person_id is not null + ) + where profile_rank = 1 + ), + exploded_links as ( + select + *, + row_number() over (partition by distinct_id order by updated_at desc, version desc, created_at desc) as identity_rank + from latest_profiles, + unnest(distinct_ids) as identity_distinct_ids(distinct_id) + where distinct_id is not null + ) + select + person_id || ':' || distinct_id as identity_link_id, + person_id, + person_int_id, + canonical_distinct_id, + distinct_id, + distinct_id as actor_id, + distinct_id = canonical_distinct_id as is_canonical, + array_length(distinct_ids) as person_distinct_id_count, + source, + operation, + team_id, + api_key, + source_event_uuid, + created_at, + updated_at, + version, + properties, + properties_set_once, + merged_properties + from exploded_links + where identity_rank = 1 + description: One row per PostHog distinct id linked to a Hogflare person profile. + primary_key: identity_link_id + default_time_dimension: updated_at + default_grain: day + + relationships: + - name: person_profiles + type: many_to_one + foreign_key: person_id + primary_key: person_id + - name: persons + type: many_to_one + foreign_key: distinct_id + primary_key: actor_id + + dimensions: + - name: identity_link_id + type: categorical + - name: person_id + type: categorical + - name: person_int_id + type: numeric + - name: canonical_distinct_id + type: categorical + - name: distinct_id + type: categorical + - name: actor_id + type: categorical + - name: is_canonical + type: boolean + - name: person_distinct_id_count + type: numeric + - name: source + type: categorical + - name: operation + type: categorical + - name: team_id + type: numeric + - name: api_key + type: categorical + - name: source_event_uuid + type: categorical + - name: version + type: numeric + - name: created_at + type: time + granularity: day + - name: updated_at + type: time + granularity: day + + metrics: + - name: identity_link_count + agg: count + description: Distinct-id links across person profiles. + - name: resolved_persons + agg: count_distinct + sql: person_id + description: Person profiles with at least one distinct-id link. + - name: linked_actors + agg: count_distinct + sql: distinct_id + description: Distinct PostHog actor ids present in the person identity bridge. + - name: canonical_links + agg: count + filters: + - is_canonical + description: Links where the distinct id is the profile canonical distinct id. + - name: noncanonical_links + agg: count + filters: + - not is_canonical + description: Alias or secondary distinct-id links. + diff --git a/models/metrics.yml b/models/metrics.yml new file mode 100644 index 0000000..d7cd165 --- /dev/null +++ b/models/metrics.yml @@ -0,0 +1,78 @@ +metrics: + - name: identification_rate + type: ratio + numerator: events.known_persons + denominator: events.unique_users + description: Share of active actors that have a resolved Hogflare person id. + + - name: pageviews_per_session + type: ratio + numerator: sessions.total_pageviews + denominator: sessions.session_count + description: Average pageview events per real PostHog SDK session. + + - name: events_per_user + type: ratio + numerator: events.event_count + denominator: events.unique_users + description: Average events per active actor. + + - name: grouped_event_rate + type: ratio + numerator: events.grouped_events + denominator: events.event_count + description: Share of events linked to any group. + + - name: day_1_retention_rate + type: ratio + numerator: first_event_retention.day_1_retained_users + denominator: first_event_retention.cohort_users + description: Share of first-event cohort users active one day later. + + - name: day_7_retention_rate + type: ratio + numerator: first_event_retention.day_7_retained_users + denominator: first_event_retention.cohort_users + description: Share of first-event cohort users active seven days later. + + - name: day_30_retention_rate + type: ratio + numerator: first_event_retention.day_30_retained_users + denominator: first_event_retention.cohort_users + description: Share of first-event cohort users active thirty days later. + + - name: bounce_rate + type: ratio + numerator: sessions.bounced_sessions + denominator: sessions.session_count + description: Share of real PostHog SDK sessions with one or fewer pageviews. + + - name: engagement_rate + type: ratio + numerator: sessions.engaged_sessions + denominator: sessions.session_count + description: Share of real PostHog SDK sessions with multiple events or at least 10 seconds duration. + + - name: fallback_activity_engagement_rate + type: ratio + numerator: activity_days.engaged_activity_days + denominator: activity_days.activity_day_count + description: Share of fallback actor/day rows with multiple events or at least 10 seconds duration. + + - name: identity_links_per_profile + type: ratio + numerator: identity_links.identity_link_count + denominator: identity_links.resolved_persons + description: Average distinct-id links per resolved person profile. + + - name: attributed_actor_rate + type: ratio + numerator: attribution.attributed_actors + denominator: attribution.actor_count + description: Share of active actors with UTM, initial UTM, or referrer attribution. + + - name: group_rows_per_grouped_event + type: ratio + numerator: groups.group_event_count + denominator: events.grouped_events + description: Average populated group slots per grouped event. diff --git a/models/pageviews.yml b/models/pageviews.yml new file mode 100644 index 0000000..aa12af8 --- /dev/null +++ b/models/pageviews.yml @@ -0,0 +1,142 @@ +models: + - name: pageviews + sql: | + with events as ( + select * + from ( + with latest_profiles as ( + select * + from ( + select + *, + row_number() over (partition by person_id order by updated_at desc, version desc, created_at desc) as profile_rank + from {{ persons_table }} + where person_id is not null + ) + where profile_rank = 1 + ), + identity_map as ( + select + resolved_person_id, + canonical_distinct_id, + linked_distinct_id + from ( + select + person_id as resolved_person_id, + canonical_distinct_id, + distinct_id as linked_distinct_id, + row_number() over (partition by distinct_id order by updated_at desc, version desc, created_at desc) as identity_rank + from latest_profiles, + unnest(distinct_ids) as identity_distinct_ids(distinct_id) + where distinct_id is not null + ) + where identity_rank = 1 + ), + source as ( + select + uuid, + event as event_type, + distinct_id, + distinct_id as actor_id, + coalesce(timestamp, created_at) as event_time, + properties, + person_id, + coalesce(person_id, identity_map.resolved_person_id) as resolved_person_id, + identity_map.canonical_distinct_id, + group0, + group1, + group2, + group3, + group4, + api_key, + coalesce(json_extract_string(properties, '$.$session_id'), json_extract_string(properties, '$.session_id')) as session_id, + coalesce(json_extract_string(properties, '$.$current_url'), json_extract_string(properties, '$.current_url'), json_extract_string(properties, '$.url')) as current_url, + coalesce(json_extract_string(properties, '$.$pathname'), json_extract_string(properties, '$.pathname'), json_extract_string(properties, '$.path')) as pathname, + coalesce(json_extract_string(properties, '$.$host'), json_extract_string(properties, '$.host')) as host, + coalesce(json_extract_string(properties, '$.$title'), json_extract_string(properties, '$.title')) as page_title, + coalesce(json_extract_string(properties, '$.$referrer'), json_extract_string(properties, '$.referrer')) as referrer, + coalesce(json_extract_string(properties, '$.$utm_source'), json_extract_string(properties, '$.utm_source'), json_extract_string(properties, '$.$initial_utm_source')) as utm_source, + coalesce(json_extract_string(properties, '$.$utm_medium'), json_extract_string(properties, '$.utm_medium'), json_extract_string(properties, '$.$initial_utm_medium')) as utm_medium, + coalesce(json_extract_string(properties, '$.$utm_campaign'), json_extract_string(properties, '$.utm_campaign'), json_extract_string(properties, '$.$initial_utm_campaign')) as utm_campaign, + coalesce(json_extract_string(properties, '$.$browser'), json_extract_string(properties, '$.browser')) as browser, + coalesce(json_extract_string(properties, '$.$os'), json_extract_string(properties, '$.os')) as os, + coalesce(json_extract_string(properties, '$.$device_type'), json_extract_string(properties, '$.device_type')) as device_type, + coalesce(json_extract_string(properties, '$.$geoip_country_code'), json_extract_string(properties, '$.country')) as geo_country_code + from {{ events_table }} + left join identity_map on distinct_id = identity_map.linked_distinct_id + ) + select + *, + coalesce(session_id, actor_id || ':' || strftime(event_time, '%Y-%m-%d')) as session_key + from source + ) + where event_type in ('$pageview', 'page_view', '$screen', 'screen') + ) + select * from events + description: Pageview and screen-event view for web/product analytics. + primary_key: uuid + default_time_dimension: event_time + default_grain: day + + dimensions: + - name: uuid + type: categorical + - name: event_type + type: categorical + - name: actor_id + type: categorical + - name: person_id + type: categorical + - name: resolved_person_id + type: categorical + - name: canonical_distinct_id + type: categorical + - name: session_id + type: categorical + - name: session_key + type: categorical + - name: current_url + type: categorical + - name: pathname + type: categorical + - name: host + type: categorical + - name: page_title + type: categorical + - name: referrer + type: categorical + - name: utm_source + type: categorical + - name: utm_medium + type: categorical + - name: utm_campaign + type: categorical + - name: browser + type: categorical + - name: os + type: categorical + - name: device_type + type: categorical + - name: geo_country_code + type: categorical + - name: event_time + type: time + granularity: day + + metrics: + - name: pageviews + agg: count + description: Total pageview and screen events. + - name: unique_visitors + agg: count_distinct + sql: actor_id + description: Distinct actors with pageviews. + - name: unique_sessions + agg: count_distinct + sql: session_key + description: Distinct sessions with pageviews. + - name: known_persons + agg: count_distinct + sql: resolved_person_id + description: Distinct resolved persons with pageviews. + diff --git a/models/person_profiles.yml b/models/person_profiles.yml new file mode 100644 index 0000000..5c3b41f --- /dev/null +++ b/models/person_profiles.yml @@ -0,0 +1,126 @@ +models: + - name: person_profiles + sql: | + with ranked_profiles as ( + select + *, + row_number() over (partition by person_id order by updated_at desc, version desc, created_at desc) as profile_rank + from {{ persons_table }} + where person_id is not null + ), + profile_base as ( + select + uuid, + team_id, + source, + operation, + person_id, + person_int_id, + canonical_distinct_id as actor_id, + canonical_distinct_id, + distinct_ids, + array_length(distinct_ids) as distinct_id_count, + created_at, + updated_at, + version, + properties, + properties_set_once, + merged_properties, + api_key, + source_event_uuid, + coalesce( + json_extract_string(merged_properties, '$.email'), + json_extract_string(properties, '$.email') + ) as email, + nullif(regexp_extract( + coalesce( + json_extract_string(merged_properties, '$.email'), + json_extract_string(properties, '$.email') + ), + '@([^@]+)$', + 1 + ), '') as email_domain, + coalesce( + json_extract_string(merged_properties, '$.plan'), + json_extract_string(properties, '$.plan') + ) as plan, + coalesce( + json_extract_string(merged_properties, '$.name'), + json_extract_string(properties, '$.name') + ) as name + from ranked_profiles + where profile_rank = 1 + ) + select * + from profile_base + description: Latest Hogflare person profile snapshot per person id from the R2 Data Catalog persons table. + primary_key: person_id + default_time_dimension: updated_at + default_grain: day + + relationships: + - name: persons + type: one_to_many + foreign_key: person_id + primary_key: person_id + + dimensions: + - name: uuid + type: categorical + - name: team_id + type: numeric + - name: source + type: categorical + - name: operation + type: categorical + - name: person_id + type: categorical + - name: person_int_id + type: numeric + - name: actor_id + type: categorical + - name: canonical_distinct_id + type: categorical + - name: api_key + type: categorical + - name: source_event_uuid + type: categorical + - name: email_domain + type: categorical + - name: plan + type: categorical + - name: name + type: categorical + - name: distinct_id_count + type: numeric + - name: version + type: numeric + - name: created_at + type: time + granularity: day + - name: updated_at + type: time + granularity: day + + metrics: + - name: profile_count + agg: count + description: Current person profile rows. + - name: unique_actors + agg: count_distinct + sql: actor_id + description: Distinct canonical distinct ids in the person store. + - name: total_distinct_ids + agg: sum + sql: distinct_id_count + description: Total distinct id links across person profiles. + - name: average_profile_version + agg: avg + sql: version + description: Average person profile version. + - name: identified_profiles + agg: count + filters: + - email_domain is not null + description: Person profiles with an email-shaped property. + diff --git a/models/persons.yml b/models/persons.yml new file mode 100644 index 0000000..3cccba7 --- /dev/null +++ b/models/persons.yml @@ -0,0 +1,137 @@ +models: + - name: persons + sql: | + with latest_profiles as ( + select * + from ( + select + *, + row_number() over (partition by person_id order by updated_at desc, version desc, created_at desc) as profile_rank + from {{ persons_table }} + where person_id is not null + ) + where profile_rank = 1 + ), + identity_map as ( + select + resolved_person_id, + canonical_distinct_id, + linked_distinct_id + from ( + select + person_id as resolved_person_id, + canonical_distinct_id, + distinct_id as linked_distinct_id, + row_number() over (partition by distinct_id order by updated_at desc, version desc, created_at desc) as identity_rank + from latest_profiles, + unnest(distinct_ids) as identity_distinct_ids(distinct_id) + where distinct_id is not null + ) + where identity_rank = 1 + ), + event_base as ( + select + distinct_id as actor_id, + person_id, + coalesce(person_id, identity_map.resolved_person_id) as resolved_person_id, + identity_map.canonical_distinct_id, + event as event_type, + coalesce(timestamp, created_at) as event_time, + person_created_at, + person_properties, + properties, + coalesce(json_extract_string(properties, '$.$session_id'), json_extract_string(properties, '$.session_id')) as session_id, + coalesce(json_extract_string(properties, '$.plan'), json_extract_string(person_properties, '$.plan')) as plan, + coalesce(json_extract_string(properties, '$.email'), json_extract_string(person_properties, '$.email')) as email, + coalesce(json_extract_string(properties, '$.$browser'), json_extract_string(properties, '$.browser')) as browser, + coalesce(json_extract_string(properties, '$.$os'), json_extract_string(properties, '$.os')) as os, + coalesce(json_extract_string(properties, '$.$geoip_country_code'), json_extract_string(properties, '$.country')) as geo_country_code + from {{ events_table }} + left join identity_map on distinct_id = identity_map.linked_distinct_id + ) + select + actor_id, + max(resolved_person_id) as person_id, + max(canonical_distinct_id) as canonical_distinct_id, + min(coalesce(person_created_at, event_time)) as first_seen_at, + max(event_time) as last_seen_at, + first(event_type order by event_time asc) as first_event_type, + last(event_type order by event_time asc) as last_event_type, + last(plan order by event_time asc) as plan, + nullif(regexp_extract(last(email order by event_time asc), '@([^@]+)$', 1), '') as email_domain, + last(browser order by event_time asc) as browser, + last(os order by event_time asc) as os, + last(geo_country_code order by event_time asc) as geo_country_code, + count(*) as event_count, + count(distinct coalesce(session_id, actor_id || ':' || strftime(event_time, '%Y-%m-%d'))) as session_count + from event_base + group by actor_id + description: Person/actor rollup derived from the event stream. + primary_key: actor_id + default_time_dimension: first_seen_at + default_grain: day + + relationships: + - name: person_profiles + type: many_to_one + foreign_key: person_id + primary_key: person_id + + dimensions: + - name: actor_id + type: categorical + - name: person_id + type: categorical + - name: canonical_distinct_id + type: categorical + - name: first_event_type + type: categorical + - name: last_event_type + type: categorical + - name: plan + type: categorical + - name: email_domain + type: categorical + - name: browser + type: categorical + - name: os + type: categorical + - name: geo_country_code + type: categorical + - name: event_count + type: numeric + - name: session_count + type: numeric + - name: first_seen_at + type: time + granularity: day + - name: last_seen_at + type: time + granularity: day + + metrics: + - name: person_count + agg: count + description: Count of actors/persons. + - name: identified_persons + agg: count + filters: + - person_id is not null + description: Actors with a resolved Hogflare person id. + - name: total_events + agg: sum + sql: event_count + description: Total events across actors. + - name: total_sessions + agg: sum + sql: session_count + description: Total sessions across actors. + - name: average_events_per_person + agg: avg + sql: event_count + description: Average events per actor. + - name: average_sessions_per_person + agg: avg + sql: session_count + description: Average sessions per actor. + diff --git a/models/sessions.yml b/models/sessions.yml new file mode 100644 index 0000000..5a03937 --- /dev/null +++ b/models/sessions.yml @@ -0,0 +1,192 @@ +models: + - name: sessions + sql: | + with latest_profiles as ( + select * + from ( + select + *, + row_number() over (partition by person_id order by updated_at desc, version desc, created_at desc) as profile_rank + from {{ persons_table }} + where person_id is not null + ) + where profile_rank = 1 + ), + identity_map as ( + select + resolved_person_id, + canonical_distinct_id, + linked_distinct_id + from ( + select + person_id as resolved_person_id, + canonical_distinct_id, + distinct_id as linked_distinct_id, + row_number() over (partition by distinct_id order by updated_at desc, version desc, created_at desc) as identity_rank + from latest_profiles, + unnest(distinct_ids) as identity_distinct_ids(distinct_id) + where distinct_id is not null + ) + where identity_rank = 1 + ), + event_base as ( + select + uuid, + event as event_type, + distinct_id as actor_id, + person_id, + coalesce(person_id, identity_map.resolved_person_id) as resolved_person_id, + identity_map.canonical_distinct_id, + coalesce(timestamp, created_at) as event_time, + properties, + api_key, + coalesce(json_extract_string(properties, '$.$session_id'), json_extract_string(properties, '$.session_id')) as session_id, + coalesce(json_extract_string(properties, '$.$pathname'), json_extract_string(properties, '$.pathname'), json_extract_string(properties, '$.path')) as pathname, + coalesce(json_extract_string(properties, '$.$current_url'), json_extract_string(properties, '$.current_url'), json_extract_string(properties, '$.url')) as current_url, + coalesce(json_extract_string(properties, '$.$referrer'), json_extract_string(properties, '$.referrer')) as referrer, + coalesce(json_extract_string(properties, '$.$utm_source'), json_extract_string(properties, '$.utm_source'), json_extract_string(properties, '$.$initial_utm_source')) as utm_source, + coalesce(json_extract_string(properties, '$.$utm_medium'), json_extract_string(properties, '$.utm_medium'), json_extract_string(properties, '$.$initial_utm_medium')) as utm_medium, + coalesce(json_extract_string(properties, '$.$utm_campaign'), json_extract_string(properties, '$.utm_campaign'), json_extract_string(properties, '$.$initial_utm_campaign')) as utm_campaign, + coalesce(json_extract_string(properties, '$.$browser'), json_extract_string(properties, '$.browser')) as browser, + coalesce(json_extract_string(properties, '$.$os'), json_extract_string(properties, '$.os')) as os, + coalesce(json_extract_string(properties, '$.$device_type'), json_extract_string(properties, '$.device_type')) as device_type, + coalesce(json_extract_string(properties, '$.$geoip_country_code'), json_extract_string(properties, '$.country')) as geo_country_code + from {{ events_table }} + left join identity_map on distinct_id = identity_map.linked_distinct_id + where coalesce(timestamp, created_at) is not null + ), + sessionized as ( + select + * + from event_base + where session_id is not null + ) + select + session_id, + session_id as session_key, + first(actor_id order by event_time asc) as actor_id, + first(resolved_person_id order by event_time asc) as person_id, + first(canonical_distinct_id order by event_time asc) as canonical_distinct_id, + true as has_session_id, + first(api_key order by event_time asc) as api_key, + min(event_time) as session_start_at, + max(event_time) as session_end_at, + date_diff('second', min(event_time), max(event_time)) as duration_seconds, + count(*) as event_count, + sum(case when event_type in ('$pageview', 'page_view', '$screen', 'screen') then 1 else 0 end) as pageview_count, + sum(case when event_type = '$autocapture' then 1 else 0 end) as autocapture_count, + first(pathname order by event_time asc) as landing_path, + last(pathname order by event_time asc) as exit_path, + first(current_url order by event_time asc) as landing_url, + last(current_url order by event_time asc) as exit_url, + first(referrer order by event_time asc) as referrer, + first(utm_source order by event_time asc) as utm_source, + first(utm_medium order by event_time asc) as utm_medium, + first(utm_campaign order by event_time asc) as utm_campaign, + first(browser order by event_time asc) as browser, + first(os order by event_time asc) as os, + first(device_type order by event_time asc) as device_type, + first(geo_country_code order by event_time asc) as geo_country_code + from sessionized + group by session_id + description: Real PostHog SDK session rollup keyed only by captured $session_id. + primary_key: session_id + default_time_dimension: session_start_at + default_grain: day + + relationships: + - name: persons + type: many_to_one + foreign_key: actor_id + primary_key: actor_id + + dimensions: + - name: session_key + type: categorical + - name: session_id + type: categorical + - name: actor_id + type: categorical + - name: person_id + type: categorical + - name: canonical_distinct_id + type: categorical + - name: has_session_id + type: boolean + - name: api_key + type: categorical + - name: landing_path + type: categorical + - name: exit_path + type: categorical + - name: landing_url + type: categorical + - name: exit_url + type: categorical + - name: referrer + type: categorical + - name: utm_source + type: categorical + - name: utm_medium + type: categorical + - name: utm_campaign + type: categorical + - name: browser + type: categorical + - name: os + type: categorical + - name: device_type + type: categorical + - name: geo_country_code + type: categorical + - name: duration_seconds + type: numeric + - name: event_count + type: numeric + - name: pageview_count + type: numeric + - name: session_start_at + type: time + granularity: day + - name: session_end_at + type: time + granularity: day + + metrics: + - name: session_count + agg: count + description: Total real PostHog SDK sessions. + - name: sdk_session_count + agg: count + description: Compatibility alias for real PostHog SDK sessions. + - name: unique_users + agg: count_distinct + sql: actor_id + description: Distinct actors with sessions. + - name: bounced_sessions + agg: count + filters: + - pageview_count <= 1 + description: Activity-session rollups with one or fewer pageviews. + - name: engaged_sessions + agg: count + filters: + - event_count > 1 or duration_seconds >= 10 + description: Activity-session rollups with multiple events or at least 10 seconds duration. + - name: total_session_seconds + agg: sum + sql: duration_seconds + description: Total session duration in seconds. + - name: average_session_seconds + agg: avg + sql: duration_seconds + description: Average session duration in seconds. + - name: total_pageviews + agg: sum + sql: pageview_count + description: Total pageviews in sessions. + - name: average_pageviews_per_session + agg: avg + sql: pageview_count + description: Average pageviews per session. + diff --git a/src/bin/import_posthog.rs b/src/bin/import_posthog.rs index 2abd87f..3d6729b 100644 --- a/src/bin/import_posthog.rs +++ b/src/bin/import_posthog.rs @@ -25,8 +25,13 @@ async fn run() -> Result<(), ImportError> { let mode = if dry_run { "dry run" } else { "import" }; println!( - "PostHog {mode} complete: persons={}, groups={}, events={}, skipped={}, pipeline_batches={}", - summary.persons, summary.groups, summary.events, summary.skipped, summary.pipeline_batches + "PostHog {mode} complete: persons={}, person_snapshots={}, groups={}, events={}, skipped={}, pipeline_batches={}", + summary.persons, + summary.person_snapshots, + summary.groups, + summary.events, + summary.skipped, + summary.pipeline_batches ); Ok(()) diff --git a/src/importer.rs b/src/importer.rs index 6e3e749..4df291e 100644 --- a/src/importer.rs +++ b/src/importer.rs @@ -14,13 +14,14 @@ use thiserror::Error; use uuid::Uuid; use crate::groups::GroupTypeMap; -use crate::pipeline::{PipelineClient, PipelineError, PipelineEvent}; +use crate::pipeline::{PersonPipelineRecord, PipelineClient, PipelineError, PipelineEvent}; const DEFAULT_POSTHOG_HOST: &str = "https://us.posthog.com"; const DEFAULT_BATCH_SIZE: usize = 500; const DEFAULT_TIMEOUT_SECS: u64 = 30; const DEFAULT_IMPORT_STATE_FILE: &str = ".hogflare-import-state.jsonl"; const DEFAULT_TARGET_TABLE: &str = "default.hogflare_events_v3"; +const DEFAULT_PERSONS_TARGET_TABLE: &str = "default.hogflare_persons_v2"; const DEFAULT_PIPELINE_FLUSH_SECS: u64 = 300; const MIN_TARGET_WAIT_SECS: u64 = 60; const TARGET_WAIT_GRACE_SECS: u64 = 30; @@ -35,6 +36,8 @@ pub struct ImportConfig { pub posthog_personal_api_key: String, pub pipeline_endpoint: Url, pub pipeline_auth_token: Option, + pub persons_pipeline_endpoint: Option, + pub persons_pipeline_auth_token: Option, pub pipeline_timeout: Duration, pub hogflare_api_key: Option, pub posthog_team_id: Option, @@ -61,6 +64,7 @@ pub struct ImportConfig { pub target_account_id: Option, pub target_bucket: Option, pub target_table: String, + pub persons_target_table: String, pub target_auth_token: Option, pub target_wait: Duration, pub target_poll: Duration, @@ -113,6 +117,17 @@ impl ImportConfig { let pipeline_auth_token = args .pipeline_auth_token .or_else(|| env_var("CLOUDFLARE_PIPELINE_AUTH_TOKEN")); + let persons_pipeline_endpoint = args + .persons_pipeline_endpoint + .or_else(|| env_var("IMPORT_PERSONS_PIPELINE_ENDPOINT")) + .or_else(|| env_var("CLOUDFLARE_PERSONS_PIPELINE_ENDPOINT")) + .map(|value| parse_url("CLOUDFLARE_PERSONS_PIPELINE_ENDPOINT", value)) + .transpose()?; + let persons_pipeline_auth_token = args + .persons_pipeline_auth_token + .or_else(|| env_var("IMPORT_PERSONS_PIPELINE_AUTH_TOKEN")) + .or_else(|| env_var("CLOUDFLARE_PERSONS_PIPELINE_AUTH_TOKEN")) + .or_else(|| pipeline_auth_token.clone()); let pipeline_timeout = parse_duration_secs( "CLOUDFLARE_PIPELINE_TIMEOUT_SECS", args.pipeline_timeout_secs @@ -275,6 +290,13 @@ impl ImportConfig { .or_else(|| env_var("R2_SQL_TABLE")) .unwrap_or_else(|| DEFAULT_TARGET_TABLE.to_string()), )?; + let persons_target_table = parse_target_table( + "IMPORT_PERSONS_TARGET_TABLE", + args.persons_target_table + .or_else(|| env_var("IMPORT_PERSONS_TARGET_TABLE")) + .or_else(|| env_var("CLOUDFLARE_PERSONS_TARGET_TABLE")) + .unwrap_or_else(|| DEFAULT_PERSONS_TARGET_TABLE.to_string()), + )?; let pipeline_flush = parse_optional_duration_secs( "IMPORT_PIPELINE_FLUSH_SECS", args.pipeline_flush_secs @@ -339,6 +361,8 @@ impl ImportConfig { posthog_personal_api_key, pipeline_endpoint, pipeline_auth_token, + persons_pipeline_endpoint, + persons_pipeline_auth_token, pipeline_timeout, hogflare_api_key, posthog_team_id, @@ -365,6 +389,7 @@ impl ImportConfig { target_account_id, target_bucket, target_table, + persons_target_table, target_auth_token, target_wait, target_poll, @@ -393,6 +418,8 @@ struct ImportArgs { posthog_personal_api_key: Option, pipeline_endpoint: Option, pipeline_auth_token: Option, + persons_pipeline_endpoint: Option, + persons_pipeline_auth_token: Option, pipeline_timeout_secs: Option, hogflare_api_key: Option, team_id: Option, @@ -420,6 +447,7 @@ struct ImportArgs { target_account_id: Option, target_bucket: Option, target_table: Option, + persons_target_table: Option, target_auth_token: Option, target_wait_secs: Option, pipeline_flush_secs: Option, @@ -456,6 +484,12 @@ impl ImportArgs { "--pipeline-auth-token" => { parsed.pipeline_auth_token = Some(next_arg(&arg, &mut args)?); } + "--persons-pipeline-endpoint" => { + parsed.persons_pipeline_endpoint = Some(next_arg(&arg, &mut args)?); + } + "--persons-pipeline-auth-token" => { + parsed.persons_pipeline_auth_token = Some(next_arg(&arg, &mut args)?); + } "--pipeline-timeout-secs" => { parsed.pipeline_timeout_secs = Some(next_arg(&arg, &mut args)?); } @@ -507,6 +541,9 @@ impl ImportArgs { "--target-table" => { parsed.target_table = Some(next_arg(&arg, &mut args)?); } + "--persons-target-table" => { + parsed.persons_target_table = Some(next_arg(&arg, &mut args)?); + } "--target-auth-token" => { parsed.target_auth_token = Some(next_arg(&arg, &mut args)?); } @@ -686,12 +723,23 @@ fn parse_datetime(value: &str) -> Option> { } fn deterministic_import_uuid(parts: &[&str]) -> String { + deterministic_import_uuid_value(parts).to_string() +} + +fn deterministic_import_uuid_value(parts: &[&str]) -> Uuid { let mut name = String::from("hogflare:posthog-import"); for part in parts { name.push('\u{1f}'); name.push_str(part); } - Uuid::new_v5(&Uuid::NAMESPACE_URL, name.as_bytes()).to_string() + Uuid::new_v5(&Uuid::NAMESPACE_URL, name.as_bytes()) +} + +fn stable_import_int_id(parts: &[&str]) -> i64 { + let uuid = deterministic_import_uuid_value(parts); + let mut bytes = [0_u8; 8]; + bytes.copy_from_slice(&uuid.as_bytes()[..8]); + i64::from_be_bytes(bytes) & i64::MAX } fn target_wait_for_flush(flush_secs: u64) -> Duration { @@ -806,6 +854,7 @@ async fn align_target_wait_to_pipeline_flush(config: &mut ImportConfig) -> Resul #[derive(Debug, Default, Clone, PartialEq, Eq)] pub struct ImportSummary { pub persons: usize, + pub person_snapshots: usize, pub groups: usize, pub events: usize, pub skipped: usize, @@ -886,6 +935,7 @@ pub enum ImportError { enum ImportKind { Event, Person, + PersonSnapshot, Group, } @@ -894,6 +944,7 @@ impl ImportKind { match self { Self::Event => "event", Self::Person => "person", + Self::PersonSnapshot => "person_snapshot", Self::Group => "group", } } @@ -926,6 +977,15 @@ impl ImportKey { } } + fn person_snapshot(uuid: String, person_id: String) -> Self { + Self { + uuid, + kind: ImportKind::PersonSnapshot, + logical_key: person_id, + group_type: None, + } + } + fn group(uuid: String, group_type: String, group_key: String) -> Self { Self { uuid, @@ -939,6 +999,7 @@ impl ImportKey { match self.kind { ImportKind::Event => format!("event\t{}", self.logical_key), ImportKind::Person => format!("person\t{}", self.logical_key), + ImportKind::PersonSnapshot => format!("person_snapshot\t{}", self.logical_key), ImportKind::Group => format!( "group\t{}\t{}", self.group_type.as_deref().unwrap_or_default(), @@ -963,6 +1024,12 @@ struct ImportBatchItem { event: PipelineEvent, } +#[derive(Debug, Clone)] +struct PersonSnapshotBatchItem { + key: ImportKey, + record: PersonPipelineRecord, +} + #[derive(Debug, Clone, Serialize, Deserialize)] struct ImportStateRecord { kind: String, @@ -978,6 +1045,7 @@ impl ImportStateRecord { match self.kind.as_str() { "event" => Some(format!("event\t{}", self.key)), "person" => Some(format!("person\t{}", self.key)), + "person_snapshot" => Some(format!("person_snapshot\t{}", self.key)), "group" => Some(format!( "group\t{}\t{}", self.group_type.as_deref().unwrap_or_default(), @@ -1090,6 +1158,13 @@ struct R2SqlTarget { impl R2SqlTarget { fn from_config(config: &ImportConfig) -> Result, ImportError> { + Self::from_config_table(config, config.target_table.clone()) + } + + fn from_config_table( + config: &ImportConfig, + table: String, + ) -> Result, ImportError> { if !config.target_checks_enabled { return Ok(None); } @@ -1114,7 +1189,7 @@ impl R2SqlTarget { client, endpoint, auth_token, - table: config.target_table.clone(), + table, })) } @@ -1126,6 +1201,16 @@ impl R2SqlTarget { Ok(existing) } + async fn existing_person_snapshots( + &self, + keys: &[ImportKey], + ) -> Result, ImportError> { + let mut existing = HashSet::new(); + self.mark_existing_person_snapshots(keys, &mut existing) + .await?; + Ok(existing) + } + async fn wait_for_existing_keys( &self, keys: &[ImportKey], @@ -1148,6 +1233,28 @@ impl R2SqlTarget { } } + async fn wait_for_person_snapshots( + &self, + keys: &[ImportKey], + wait: Duration, + poll: Duration, + ) -> Result, ImportError> { + let started = Instant::now(); + loop { + let existing = self.existing_person_snapshots(keys).await?; + if existing.len() == keys.len() || started.elapsed() >= wait { + return Ok(existing); + } + + let remaining = wait.saturating_sub(started.elapsed()); + let sleep_for = remaining.min(poll); + if sleep_for.is_zero() { + return Ok(existing); + } + tokio::time::sleep(sleep_for).await; + } + } + async fn mark_existing_by_uuid( &self, keys: &[ImportKey], @@ -1255,6 +1362,37 @@ impl R2SqlTarget { Ok(()) } + async fn mark_existing_person_snapshots( + &self, + keys: &[ImportKey], + existing: &mut HashSet, + ) -> Result<(), ImportError> { + let mut keys_by_person_id: HashMap = HashMap::new(); + for key in keys + .iter() + .filter(|key| matches!(key.kind, ImportKind::PersonSnapshot)) + { + keys_by_person_id.insert(key.logical_key.clone(), key.state_key()); + } + let person_ids = keys_by_person_id.keys().cloned().collect::>(); + for chunk in person_ids.chunks(500) { + let person_id_list = sql_string_list(chunk); + let query = format!( + "select person_id from {} where source = 'posthog' and person_id in ({person_id_list}) group by person_id limit {}", + self.table, + chunk.len() + ); + for row in self.query_rows(query).await? { + if let Some(person_id) = row_string(&row, "person_id") { + if let Some(state_key) = keys_by_person_id.get(&person_id) { + existing.insert(state_key.clone()); + } + } + } + } + Ok(()) + } + async fn query_rows(&self, query: String) -> Result, ImportError> { let response = self .client @@ -1437,7 +1575,15 @@ pub async fn run_import(config: ImportConfig) -> Result Some(PipelineClient::new_without_retries( + endpoint, + config.persons_pipeline_auth_token.clone(), + config.pipeline_timeout, + )?), + None => None, + }; + let mut importer = Importer::new(config, client, pipeline, persons_pipeline)?; importer.run().await } @@ -1446,11 +1592,13 @@ struct Importer { config: ImportConfig, posthog: PostHogClient, pipeline: PipelineClient, + persons_pipeline: Option, group_type_map: GroupTypeMap, persons_by_distinct_id: HashMap, group_properties: HashMap<(String, String), Map>, import_state: ImportState, target: Option, + persons_target: Option, summary: ImportSummary, } @@ -1459,19 +1607,27 @@ impl Importer { config: ImportConfig, posthog: PostHogClient, pipeline: PipelineClient, + persons_pipeline: Option, ) -> Result { let group_type_map = GroupTypeMap::new(config.posthog_group_types.clone()); let import_state = ImportState::load(config.import_state_file.clone())?; let target = R2SqlTarget::from_config(&config)?; + let persons_target = if persons_pipeline.is_some() { + R2SqlTarget::from_config_table(&config, config.persons_target_table.clone())? + } else { + None + }; Ok(Self { config, posthog, pipeline, + persons_pipeline, group_type_map, persons_by_distinct_id: HashMap::new(), group_properties: HashMap::new(), import_state, target, + persons_target, summary: ImportSummary::default(), }) } @@ -1493,7 +1649,8 @@ impl Importer { } async fn import_persons(&mut self) -> Result<(), ImportError> { - let mut buffer = Vec::new(); + let mut event_buffer = Vec::new(); + let mut snapshot_buffer = Vec::new(); let mut next = self .next_limit(self.config.max_persons, self.summary.persons) .map(|limit| self.posthog.persons_url(limit, self.config.persons_offset)) @@ -1513,15 +1670,33 @@ impl Importer { if self.config.emit_persons { if let Some(event) = person.to_pipeline_event(&self.config) { - buffer.push(ImportBatchItem { + event_buffer.push(ImportBatchItem { key: ImportKey::person(event.uuid.clone(), event.distinct_id.clone()), event, }); } } - if buffer.len() >= self.config.batch_size { - self.send(std::mem::take(&mut buffer)).await?; + if self.persons_pipeline.is_some() { + if let Some(record) = person.to_person_pipeline_record(&self.config) { + snapshot_buffer.push(PersonSnapshotBatchItem { + key: ImportKey::person_snapshot( + record.uuid.clone(), + record.person_id.clone(), + ), + record, + }); + } + } + + if event_buffer.len() >= self.config.batch_size { + self.send(std::mem::take(&mut event_buffer)).await?; + } + if snapshot_buffer.len() >= self.config.batch_size { + let sent = self + .send_person_snapshots(std::mem::take(&mut snapshot_buffer)) + .await?; + self.summary.person_snapshots += sent; } self.summary.persons += 1; @@ -1537,7 +1712,10 @@ impl Importer { } } - self.send(buffer).await.map(|_| ()) + self.send(event_buffer).await?; + let sent = self.send_person_snapshots(snapshot_buffer).await?; + self.summary.person_snapshots += sent; + Ok(()) } async fn import_groups(&mut self) -> Result<(), ImportError> { @@ -1906,6 +2084,44 @@ impl Importer { } } + async fn send_person_snapshots( + &mut self, + items: Vec, + ) -> Result { + let items = self.filter_existing_person_snapshots(items).await?; + if items.is_empty() { + return Ok(0); + } + + let keys = items + .iter() + .map(|item| item.key.clone()) + .collect::>(); + if self.config.dry_run { + self.summary.pipeline_batches += 1; + return Ok(items.len()); + } + + let records = items + .iter() + .map(|item| item.record.clone()) + .collect::>(); + let Some(pipeline) = self.persons_pipeline.as_ref() else { + return Ok(0); + }; + match pipeline.send_records(records).await { + Ok(()) => { + self.import_state.record(&keys)?; + self.summary.pipeline_batches += 1; + Ok(keys.len()) + } + Err(source) => { + self.handle_ambiguous_person_snapshot_send(keys, source) + .await + } + } + } + async fn filter_existing( &mut self, items: Vec, @@ -1950,6 +2166,50 @@ impl Importer { Ok(filtered) } + async fn filter_existing_person_snapshots( + &mut self, + items: Vec, + ) -> Result, ImportError> { + let mut batch_seen = HashSet::new(); + let mut filtered = Vec::with_capacity(items.len()); + let mut skipped = 0; + + for item in items { + let state_key = item.key.state_key(); + if self.import_state.contains(&item.key) || !batch_seen.insert(state_key) { + skipped += 1; + } else { + filtered.push(item); + } + } + + if let Some(target) = self.persons_target.as_ref() { + let keys = filtered + .iter() + .map(|item| item.key.clone()) + .collect::>(); + let existing = target.existing_person_snapshots(&keys).await?; + if !existing.is_empty() { + let mut existing_keys = Vec::new(); + filtered.retain(|item| { + if existing.contains(&item.key.state_key()) { + existing_keys.push(item.key.clone()); + false + } else { + true + } + }); + skipped += existing_keys.len(); + if !self.config.dry_run { + self.import_state.record(&existing_keys)?; + } + } + } + + self.summary.skipped += skipped; + Ok(filtered) + } + async fn handle_ambiguous_send( &mut self, keys: Vec, @@ -1987,6 +2247,44 @@ impl Importer { source, }) } + + async fn handle_ambiguous_person_snapshot_send( + &mut self, + keys: Vec, + source: PipelineError, + ) -> Result { + let Some(target) = self.persons_target.as_ref() else { + return Err(ImportError::AmbiguousPipelineCommit { + confirmed: 0, + total: keys.len(), + wait_secs: 0, + source, + }); + }; + + let existing = target + .wait_for_person_snapshots(&keys, self.config.target_wait, self.config.target_poll) + .await?; + let confirmed = keys + .iter() + .filter(|key| existing.contains(&key.state_key())) + .cloned() + .collect::>(); + if !confirmed.is_empty() { + self.import_state.record(&confirmed)?; + } + if confirmed.len() == keys.len() { + self.summary.pipeline_batches += 1; + return Ok(confirmed.len()); + } + + Err(ImportError::AmbiguousPipelineCommit { + confirmed: confirmed.len(), + total: keys.len(), + wait_secs: self.config.target_wait.as_secs(), + source, + }) + } } #[derive(Clone)] @@ -2338,12 +2636,24 @@ impl PostHogPerson { .or_else(|| self.id.as_ref().and_then(value_to_string)) } + fn person_key(&self) -> Option { + self.posthog_person_id() + .or_else(|| self.primary_distinct_id()) + } + + fn posthog_person_id(&self) -> Option { + self.uuid + .clone() + .or_else(|| self.id.as_ref().and_then(value_to_string)) + } + + fn person_int_id(&self) -> Option { + self.id.as_ref().and_then(value_to_i64) + } + fn snapshot(&self) -> Option { Some(ImportedPersonSnapshot { - person_id: self - .uuid - .clone() - .or_else(|| self.id.as_ref().and_then(value_to_string)), + person_id: self.person_key(), created_at: self.created_at, properties: self.properties.clone(), }) @@ -2351,10 +2661,7 @@ impl PostHogPerson { fn to_pipeline_event(&self, config: &ImportConfig) -> Option { let distinct_id = self.primary_distinct_id()?; - let person_id = self - .uuid - .clone() - .or_else(|| self.id.as_ref().and_then(value_to_string)); + let person_id = self.posthog_person_id(); let person_key = person_id.clone().unwrap_or_else(|| distinct_id.clone()); let import_uuid = deterministic_import_uuid(&[ &config.posthog_project_id, @@ -2410,6 +2717,60 @@ impl PostHogPerson { extra, }) } + + fn to_person_pipeline_record(&self, config: &ImportConfig) -> Option { + let canonical_distinct_id = self.primary_distinct_id()?; + let person_id = self.person_key()?; + let created_at = self.created_at.unwrap_or_else(Utc::now); + let source_event_uuid = deterministic_import_uuid(&[ + &config.posthog_project_id, + config.posthog_environment_id.as_deref().unwrap_or(""), + "person", + &person_id, + ]); + let uuid = deterministic_import_uuid(&[ + &config.posthog_project_id, + config.posthog_environment_id.as_deref().unwrap_or(""), + "person_snapshot", + &person_id, + ]); + let distinct_ids = if self.distinct_ids.is_empty() { + vec![canonical_distinct_id.clone()] + } else { + self.distinct_ids.clone() + }; + let properties = self + .properties + .clone() + .unwrap_or_else(|| Value::Object(Map::new())); + let person_int_id = self.person_int_id().unwrap_or_else(|| { + stable_import_int_id(&[ + &config.posthog_project_id, + config.posthog_environment_id.as_deref().unwrap_or(""), + "person", + &person_id, + ]) + }); + + Some(PersonPipelineRecord { + uuid, + team_id: config.posthog_team_id, + source: "posthog", + operation: "import".to_string(), + person_id, + person_int_id, + canonical_distinct_id, + distinct_ids: Value::Array(distinct_ids.into_iter().map(Value::String).collect()), + created_at, + updated_at: Utc::now(), + version: 1, + properties: properties.clone(), + properties_set_once: Value::Object(Map::new()), + merged_properties: properties, + api_key: config.hogflare_api_key.clone(), + source_event_uuid, + }) + } } #[derive(Debug, Clone)] @@ -2575,6 +2936,14 @@ fn value_to_string(value: &Value) -> Option { } } +fn value_to_i64(value: &Value) -> Option { + match value { + Value::Number(value) => value.as_i64(), + Value::String(value) => value.parse().ok(), + _ => None, + } +} + fn value_to_datetime(value: &Value) -> Option> { value.as_str().and_then(parse_datetime) } @@ -2606,6 +2975,8 @@ fn usage() -> String { " --posthog-host https://us.posthog.com", " --environment-id ", " --pipeline-auth-token ", + " --persons-pipeline-endpoint ", + " --persons-pipeline-auth-token ", " --hogflare-api-key ", " --from 2025-01-01", " --to 2025-02-01", @@ -2621,6 +2992,7 @@ fn usage() -> String { " --target-account-id ", " --target-bucket ", " --target-table default.hogflare_events_v3", + " --persons-target-table default.hogflare_persons_v2", " --target-auth-token ", " --target-wait-secs ", " --pipeline-flush-secs 300", @@ -2646,6 +3018,8 @@ mod tests { posthog_personal_api_key: "phx_test".to_string(), pipeline_endpoint: Url::parse("http://127.0.0.1:1/").unwrap(), pipeline_auth_token: None, + persons_pipeline_endpoint: None, + persons_pipeline_auth_token: None, pipeline_timeout: Duration::from_secs(1), hogflare_api_key: Some("phc_test".to_string()), posthog_team_id: Some(42), @@ -2672,6 +3046,7 @@ mod tests { target_account_id: None, target_bucket: None, target_table: DEFAULT_TARGET_TABLE.to_string(), + persons_target_table: DEFAULT_PERSONS_TARGET_TABLE.to_string(), target_auth_token: None, target_wait: target_wait_for_flush(DEFAULT_PIPELINE_FLUSH_SECS), target_poll: target_poll_for_flush(DEFAULT_PIPELINE_FLUSH_SECS), @@ -2717,6 +3092,10 @@ mod tests { "phx_test", "--pipeline-endpoint", "http://127.0.0.1:1/", + "--persons-pipeline-endpoint", + "http://127.0.0.1:2/", + "--persons-pipeline-auth-token", + "persons-token", "--dry-run", "--persons-offset", "10", @@ -2751,6 +3130,12 @@ mod tests { "phx_test", "--pipeline-endpoint", "http://127.0.0.1:1/", + "--persons-pipeline-endpoint", + "http://127.0.0.1:2/", + "--persons-pipeline-auth-token", + "persons-token", + "--persons-target-table", + "default.custom_persons", "--dry-run", "--persons-offset", "10", @@ -2787,6 +3172,15 @@ mod tests { config.events_after_uuid.as_deref(), Some("0192129b-c354-77b4-b496-9be7ec571fb4") ); + assert_eq!( + config.persons_pipeline_endpoint.as_ref().map(Url::as_str), + Some("http://127.0.0.1:2/") + ); + assert_eq!( + config.persons_pipeline_auth_token.as_deref(), + Some("persons-token") + ); + assert_eq!(config.persons_target_table, "default.custom_persons"); assert_eq!(config.event_uuids_file.as_deref(), Some("/tmp/missing.txt")); assert_eq!(config.event_window_days, None); assert_eq!(config.event_window_hours, Some(6)); @@ -2903,6 +3297,41 @@ mod tests { assert_eq!(first.uuid, second.uuid); } + #[test] + fn person_import_snapshot_uses_stable_uuid_and_schema() { + let config = base_config(); + let person: PostHogPerson = serde_json::from_value(json!({ + "id": 7, + "uuid": "person-uuid", + "distinct_ids": ["user-1", "anon-1"], + "properties": {"email": "u@example.com"}, + "created_at": "2025-01-02T03:04:05Z" + })) + .unwrap(); + + let event = person.to_pipeline_event(&config).unwrap(); + let first = person.to_person_pipeline_record(&config).unwrap(); + let second = person.to_person_pipeline_record(&config).unwrap(); + + assert_eq!(first.uuid, second.uuid); + assert_eq!(first.source, "posthog"); + assert_eq!(first.operation, "import"); + assert_eq!(first.person_id, "person-uuid"); + assert_eq!(first.person_int_id, 7); + assert_eq!(first.canonical_distinct_id, "user-1"); + assert_eq!(first.distinct_ids, json!(["user-1", "anon-1"])); + assert_eq!( + first.created_at, + parse_datetime("2025-01-02T03:04:05Z").unwrap() + ); + assert_eq!(first.version, 1); + assert_eq!(first.properties, json!({"email": "u@example.com"})); + assert_eq!(first.properties_set_once, json!({})); + assert_eq!(first.merged_properties, json!({"email": "u@example.com"})); + assert_eq!(first.api_key.as_deref(), Some("phc_test")); + assert_eq!(first.source_event_uuid, event.uuid); + } + #[test] fn client_prefers_environment_scoped_person_and_group_urls() { let config = base_config(); @@ -2985,6 +3414,7 @@ mod tests { let keys = vec![ ImportKey::event("event-uuid".to_string()), ImportKey::person("person-event-uuid".to_string(), "user-1".to_string()), + ImportKey::person_snapshot("person-snapshot-uuid".to_string(), "person-1".to_string()), ImportKey::group( "group-event-uuid".to_string(), "company".to_string(), @@ -3018,7 +3448,7 @@ mod tests { Duration::from_secs(1), ) .unwrap(); - let mut importer = Importer::new(config, posthog, pipeline).unwrap(); + let mut importer = Importer::new(config, posthog, pipeline, None).unwrap(); importer.persons_by_distinct_id.insert( "user-1".to_string(), ImportedPersonSnapshot {