Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion docs/import-posthog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Hogflare includes a host-side importer for backfilling an existing PostHog project into the same Cloudflare Pipeline sink used by the Worker. It reads PostHog's private API with a personal API key, then writes normalized rows to the pipeline:

- persons as `$identify` rows
- persons as `$identify` event rows and, when configured, `PersonPipelineRecord` snapshots
- groups as `$groupidentify` rows
- historical events from HogQL with original `timestamp`, `created_at`, and PostHog event `uuid` when available

Expand Down Expand Up @@ -42,7 +42,10 @@ export IMPORT_STATE_FILE=".hogflare-import-state.jsonl"
export IMPORT_TARGET_ACCOUNT_ID="<cloudflare_account_id>"
export IMPORT_TARGET_BUCKET="<r2_bucket>"
export IMPORT_TARGET_TABLE="default.hogflare_events_v3"
export IMPORT_PERSONS_TARGET_TABLE="default.hogflare_persons_v2"
export WRANGLER_R2_SQL_AUTH_TOKEN="<r2 sql token>"
export CLOUDFLARE_PERSONS_PIPELINE_ENDPOINT="https://<persons-stream-id>.ingest.cloudflare.com"
export CLOUDFLARE_PERSONS_PIPELINE_AUTH_TOKEN="<persons pipeline token>" # falls back to event pipeline token
export IMPORT_CLOUDFLARE_API_TOKEN="<token with Pipelines read>" # optional auto flush discovery
export IMPORT_PIPELINE_FLUSH_SECS="300" # fallback if Pipelines read is unavailable
```
Expand Down Expand Up @@ -75,6 +78,8 @@ cargo run --bin import_posthog -- \
--personal-api-key "$POSTHOG_PERSONAL_API_KEY" \
--pipeline-endpoint "$CLOUDFLARE_PIPELINE_ENDPOINT" \
--pipeline-auth-token "$CLOUDFLARE_PIPELINE_AUTH_TOKEN" \
--persons-pipeline-endpoint "$CLOUDFLARE_PERSONS_PIPELINE_ENDPOINT" \
--persons-pipeline-auth-token "$CLOUDFLARE_PERSONS_PIPELINE_AUTH_TOKEN" \
--hogflare-api-key phc_example \
--from 2025-01-01 \
--to 2025-02-01 \
Expand All @@ -91,6 +96,7 @@ cargo run --bin import_posthog -- \
--target-account-id "$CLOUDFLARE_ACCOUNT_ID" \
--target-bucket hogflare \
--target-table default.hogflare_events_v3 \
--persons-target-table default.hogflare_persons_v2 \
--target-auth-token "$WRANGLER_R2_SQL_AUTH_TOKEN" \
--cloudflare-api-token "$CLOUDFLARE_API_TOKEN"
```
Expand Down
183 changes: 183 additions & 0 deletions models/activity_days.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
models:
- name: activity_days
sql: |
with latest_profiles as (
select *
from (
select
*,
row_number() over (partition by person_id order by updated_at desc, version desc, created_at desc) as profile_rank
from {{ persons_table }}
where person_id is not null
)
where profile_rank = 1
),
identity_map as (
select
resolved_person_id,
canonical_distinct_id,
linked_distinct_id
from (
select
person_id as resolved_person_id,
canonical_distinct_id,
distinct_id as linked_distinct_id,
row_number() over (partition by distinct_id order by updated_at desc, version desc, created_at desc) as identity_rank
from latest_profiles,
unnest(distinct_ids) as identity_distinct_ids(distinct_id)
where distinct_id is not null
)
where identity_rank = 1
),
event_base as (
select
uuid,
event as event_type,
distinct_id as actor_id,
person_id,
coalesce(person_id, identity_map.resolved_person_id) as resolved_person_id,
identity_map.canonical_distinct_id,
coalesce(timestamp, created_at) as event_time,
date_trunc('day', coalesce(timestamp, created_at)) as activity_day,
properties,
api_key,
coalesce(json_extract_string(properties, '$.$session_id'), json_extract_string(properties, '$.session_id')) as session_id,
coalesce(json_extract_string(properties, '$.$pathname'), json_extract_string(properties, '$.pathname'), json_extract_string(properties, '$.path')) as pathname,
coalesce(json_extract_string(properties, '$.$current_url'), json_extract_string(properties, '$.current_url'), json_extract_string(properties, '$.url')) as current_url,
coalesce(json_extract_string(properties, '$.$referrer'), json_extract_string(properties, '$.referrer')) as referrer,
coalesce(json_extract_string(properties, '$.$utm_source'), json_extract_string(properties, '$.utm_source'), json_extract_string(properties, '$.$initial_utm_source')) as utm_source,
coalesce(json_extract_string(properties, '$.$utm_medium'), json_extract_string(properties, '$.utm_medium'), json_extract_string(properties, '$.$initial_utm_medium')) as utm_medium,
coalesce(json_extract_string(properties, '$.$utm_campaign'), json_extract_string(properties, '$.utm_campaign'), json_extract_string(properties, '$.$initial_utm_campaign')) as utm_campaign,
coalesce(json_extract_string(properties, '$.$browser'), json_extract_string(properties, '$.browser')) as browser,
coalesce(json_extract_string(properties, '$.$os'), json_extract_string(properties, '$.os')) as os,
coalesce(json_extract_string(properties, '$.$device_type'), json_extract_string(properties, '$.device_type')) as device_type,
coalesce(json_extract_string(properties, '$.$geoip_country_code'), json_extract_string(properties, '$.country')) as geo_country_code
from {{ events_table }}
left join identity_map on distinct_id = identity_map.linked_distinct_id
where coalesce(timestamp, created_at) is not null
),
fallback_events as (
select *
from event_base
where session_id is null
)
select
actor_id || ':' || strftime(activity_day, '%Y-%m-%d') as activity_day_key,
actor_id,
max(resolved_person_id) as person_id,
max(canonical_distinct_id) as canonical_distinct_id,
first(api_key order by event_time asc) as api_key,
activity_day,
min(event_time) as first_activity_at,
max(event_time) as last_activity_at,
date_diff('second', min(event_time), max(event_time)) as duration_seconds,
count(*) as event_count,
sum(case when event_type in ('$pageview', 'page_view', '$screen', 'screen') then 1 else 0 end) as pageview_count,
sum(case when event_type = '$autocapture' then 1 else 0 end) as autocapture_count,
first(pathname order by event_time asc) as first_path,
last(pathname order by event_time asc) as last_path,
first(current_url order by event_time asc) as first_url,
last(current_url order by event_time asc) as last_url,
first(referrer order by event_time asc) as referrer,
first(utm_source order by event_time asc) as utm_source,
first(utm_medium order by event_time asc) as utm_medium,
first(utm_campaign order by event_time asc) as utm_campaign,
first(browser order by event_time asc) as browser,
first(os order by event_time asc) as os,
first(device_type order by event_time asc) as device_type,
first(geo_country_code order by event_time asc) as geo_country_code
from fallback_events
group by actor_id, activity_day
description: Actor/day fallback activity rollup for events that did not carry a PostHog $session_id.
primary_key: activity_day_key
default_time_dimension: activity_day
default_grain: day

relationships:
- name: persons
type: many_to_one
foreign_key: actor_id
primary_key: actor_id

dimensions:
- name: activity_day_key
type: categorical
- name: actor_id
type: categorical
- name: person_id
type: categorical
- name: canonical_distinct_id
type: categorical
- name: api_key
type: categorical
- name: first_path
type: categorical
- name: last_path
type: categorical
- name: first_url
type: categorical
- name: last_url
type: categorical
- name: referrer
type: categorical
- name: utm_source
type: categorical
- name: utm_medium
type: categorical
- name: utm_campaign
type: categorical
- name: browser
type: categorical
- name: os
type: categorical
- name: device_type
type: categorical
- name: geo_country_code
type: categorical
- name: duration_seconds
type: numeric
- name: event_count
type: numeric
- name: pageview_count
type: numeric
- name: activity_day
type: time
granularity: day
- name: first_activity_at
type: time
granularity: day
- name: last_activity_at
type: time
granularity: day

metrics:
- name: activity_day_count
agg: count
description: Actor/day fallback rows for events without SDK session ids.
- name: active_users
agg: count_distinct
sql: actor_id
description: Distinct actors with fallback activity days.
- name: fallback_events
agg: sum
sql: event_count
description: Events included in actor/day fallback rollups.
- name: fallback_pageviews
agg: sum
sql: pageview_count
description: Pageviews included in actor/day fallback rollups.
- name: bounced_activity_days
agg: count
filters:
- pageview_count <= 1
description: Fallback actor/day rows with one or fewer pageviews.
- name: engaged_activity_days
agg: count
filters:
- event_count > 1 or duration_seconds >= 10
description: Fallback actor/day rows with multiple events or at least 10 seconds duration.
- name: total_activity_seconds
agg: sum
sql: duration_seconds
description: Total duration across fallback actor/day rollups.

Loading
Loading