diff --git a/relay-profiling/src/lib.rs b/relay-profiling/src/lib.rs index 7332fd66c43..a973bb2de04 100644 --- a/relay-profiling/src/lib.rs +++ b/relay-profiling/src/lib.rs @@ -405,23 +405,21 @@ impl Getter for ExpandedPerfettoChunk { /// Expands a binary Perfetto trace into a Sample v2 profile chunk. /// -/// Decodes the protobuf trace, converts it into the internal Sample v2 format, -/// merges the provided JSON `metadata_json` (containing platform, environment, etc.), -/// and returns an [`ExpandedPerfettoChunk`] with the serialized JSON payload plus +/// Returns an [`ExpandedPerfettoChunk`] with the serialized JSON payload plus /// the profile metadata needed for downstream processing (platform, profile type, /// inbound filtering) — avoiding a second JSON deserialization pass in callers. pub fn expand_perfetto( - perfetto_bytes: &[u8], - metadata_json: &[u8], + perfetto_payload: &[u8], + json_payload: &[u8], ) -> Result { - let d = &mut Deserializer::from_slice(metadata_json); + let d = &mut Deserializer::from_slice(json_payload); let mut chunk: sample::v2::ProfileChunk = serde_path_to_error::deserialize(d).map_err(ProfileError::InvalidJson)?; let platform = chunk.metadata.platform.clone(); let release = chunk.metadata.release.clone(); - let (profile_data, debug_images) = perfetto::convert(perfetto_bytes)?; + let (profile_data, debug_images) = perfetto::convert(perfetto_payload)?; chunk.profile = profile_data; chunk.metadata.debug_meta.images.extend(debug_images); chunk.normalize()?; diff --git a/relay-profiling/tests/fixtures/android/perfetto/profile_chunk.envelope b/relay-profiling/tests/fixtures/android/perfetto/profile_chunk.envelope index 5b1c765bdeb..dc0cf89c9ed 100644 --- a/relay-profiling/tests/fixtures/android/perfetto/profile_chunk.envelope +++ b/relay-profiling/tests/fixtures/android/perfetto/profile_chunk.envelope @@ -1,5 +1,5 @@ {"event_id":"c3b09c0608844f558eaf6e65df6b9cdf","sdk":{"name":"sentry.java.android","version":"8.38.0","packages":[{"name":"maven:io.sentry:sentry","version":"8.38.0"},{"name":"maven:io.sentry:sentry-android-core","version":"8.38.0"},{"name":"maven:io.sentry:sentry-android-fragment","version":"8.38.0"},{"name":"maven:io.sentry:sentry-android-timber","version":"8.38.0"},{"name":"maven:io.sentry:sentry-android-replay","version":"8.38.0"},{"name":"maven:io.sentry:sentry-spotlight","version":"8.38.0"},{"name":"maven:io.sentry:sentry-compose","version":"8.38.0"},{"name":"maven:io.sentry:sentry-android-ndk","version":"8.38.0"}],"integrations":["Screenshot","ViewHierarchy","UncaughtExceptionHandler","ShutdownHook","Spotlight","SendCachedEnvelope","Ndk","Tombstone","AppLifecycle","AnrV2","AnrProfiling","ActivityLifecycle","ActivityBreadcrumbs","UserInteraction","FeedbackShake","FragmentLifecycle","Timber","AppComponentsBreadcrumbs","NetworkBreadcrumbs","AutoInit","EnvelopeFileObserver","SystemEventsBreadcrumbs"]}} -{"content_type":"application/octet-stream","filename":"profile_sentry-profiling_2026-04-28-08-33-40.perfetto-stack-sample","type":"profile_chunk","platform":"android","meta_length":7739,"length":104991} +{"content_type":"application/x-perfetto-trace","filename":"profile_sentry-profiling_2026-04-28-08-33-40.perfetto-stack-sample","type":"profile_chunk","platform":"android","meta_length":7739,"length":104991} {"profiler_id":"814b081c638b4ad982ae351547bfe499","chunk_id":"c3b09c0608844f558eaf6e65df6b9cdf","client_sdk":{"name":"sentry.java.android","version":"8.38.0","packages":[{"name":"maven:io.sentry:sentry","version":"8.38.0"},{"name":"maven:io.sentry:sentry-android-core","version":"8.38.0"},{"name":"maven:io.sentry:sentry-android-fragment","version":"8.38.0"},{"name":"maven:io.sentry:sentry-android-timber","version":"8.38.0"},{"name":"maven:io.sentry:sentry-android-replay","version":"8.38.0"},{"name":"maven:io.sentry:sentry-spotlight","version":"8.38.0"},{"name":"maven:io.sentry:sentry-compose","version":"8.38.0"},{"name":"maven:io.sentry:sentry-android-ndk","version":"8.38.0"}],"integrations":["Screenshot","ViewHierarchy","UncaughtExceptionHandler","ShutdownHook","Spotlight","SendCachedEnvelope","Ndk","Tombstone","AppLifecycle","AnrV2","AnrProfiling","ActivityLifecycle","ActivityBreadcrumbs","UserInteraction","FeedbackShake","FragmentLifecycle","Timber","AppComponentsBreadcrumbs","NetworkBreadcrumbs","AutoInit","EnvelopeFileObserver","SystemEventsBreadcrumbs"]},"measurements":{"memory_native_footprint":{"unit":"byte","values":[{"value":3.6631152E7,"elapsed_since_start_ns":"1777358020895000000","timestamp":1777358020.895000},{"value":3.6636E7,"elapsed_since_start_ns":"1777358020994000000","timestamp":1777358020.994000},{"value":3.6598336E7,"elapsed_since_start_ns":"1777358021094000000","timestamp":1777358021.094000},{"value":3.6600496E7,"elapsed_since_start_ns":"1777358021194000000","timestamp":1777358021.193999},{"value":3.6601984E7,"elapsed_since_start_ns":"1777358021294000000","timestamp":1777358021.294000},{"value":3.6604128E7,"elapsed_since_start_ns":"1777358021394000000","timestamp":1777358021.393999},{"value":3.6606272E7,"elapsed_since_start_ns":"1777358021494000000","timestamp":1777358021.494000},{"value":3.6608416E7,"elapsed_since_start_ns":"1777358021594000000","timestamp":1777358021.593999},{"value":3.6614672E7,"elapsed_since_start_ns":"1777358021695000000","timestamp":1777358021.695000},{"value":3.6616816E7,"elapsed_since_start_ns":"1777358021794000000","timestamp":1777358021.794000},{"value":3.661896E7,"elapsed_since_start_ns":"1777358021894000000","timestamp":1777358021.894000},{"value":3.6621104E7,"elapsed_since_start_ns":"1777358021995000000","timestamp":1777358021.995000},{"value":3.6623248E7,"elapsed_since_start_ns":"1777358022094000000","timestamp":1777358022.094000},{"value":3.6625392E7,"elapsed_since_start_ns":"1777358022194000000","timestamp":1777358022.193999},{"value":3.6627536E7,"elapsed_since_start_ns":"1777358022294000000","timestamp":1777358022.294000},{"value":3.662968E7,"elapsed_since_start_ns":"1777358022394000000","timestamp":1777358022.393999},{"value":3.6631824E7,"elapsed_since_start_ns":"1777358022495000000","timestamp":1777358022.495000},{"value":3.6672752E7,"elapsed_since_start_ns":"1777358022594000000","timestamp":1777358022.593999},{"value":3.6748144E7,"elapsed_since_start_ns":"1777358022694000000","timestamp":1777358022.694000},{"value":3.6754304E7,"elapsed_since_start_ns":"1777358022794000000","timestamp":1777358022.794000}]},"frozen_frame_renders":{"unit":"nanosecond","values":[{"value":7.49833322E8,"elapsed_since_start_ns":"71057630775779","timestamp":1777358020.888000}]},"cpu_usage":{"unit":"percent","values":[{"value":56.20094079375762,"elapsed_since_start_ns":"1777358020895000000","timestamp":1777358020.895000},{"value":47.72786092177692,"elapsed_since_start_ns":"1777358020994000000","timestamp":1777358020.994000},{"value":52.289708049827254,"elapsed_since_start_ns":"1777358021094000000","timestamp":1777358021.094000},{"value":50.050196342916244,"elapsed_since_start_ns":"1777358021194000000","timestamp":1777358021.193999},{"value":52.620478795841386,"elapsed_since_start_ns":"1777358021294000000","timestamp":1777358021.294000},{"value":49.83694994597027,"elapsed_since_start_ns":"1777358021394000000","timestamp":1777358021.393999},{"value":52.61821576681683,"elapsed_since_start_ns":"1777358021494000000","timestamp":1777358021.494000},{"value":50.00733407561553,"elapsed_since_start_ns":"1777358021594000000","timestamp":1777358021.593999},{"value":52.31104830862539,"elapsed_since_start_ns":"1777358021695000000","timestamp":1777358021.695000},{"value":50.08750688152257,"elapsed_since_start_ns":"1777358021794000000","timestamp":1777358021.794000},{"value":52.61428295786996,"elapsed_since_start_ns":"1777358021894000000","timestamp":1777358021.894000},{"value":49.84011689221911,"elapsed_since_start_ns":"1777358021995000000","timestamp":1777358021.995000},{"value":50.07609463188072,"elapsed_since_start_ns":"1777358022094000000","timestamp":1777358022.094000},{"value":52.764437950744615,"elapsed_since_start_ns":"1777358022194000000","timestamp":1777358022.193999},{"value":49.7033742388127,"elapsed_since_start_ns":"1777358022294000000","timestamp":1777358022.294000},{"value":52.63426105211958,"elapsed_since_start_ns":"1777358022394000000","timestamp":1777358022.393999},{"value":49.806191656715804,"elapsed_since_start_ns":"1777358022495000000","timestamp":1777358022.495000},{"value":52.611141035437356,"elapsed_since_start_ns":"1777358022594000000","timestamp":1777358022.593999},{"value":32.55163503106803,"elapsed_since_start_ns":"1777358022694000000","timestamp":1777358022.694000},{"value":2.50511253386361,"elapsed_since_start_ns":"1777358022794000000","timestamp":1777358022.794000}]},"memory_footprint":{"unit":"byte","values":[{"value":1.18884E7,"elapsed_since_start_ns":"1777358020895000000","timestamp":1777358020.895000},{"value":1.2003504E7,"elapsed_since_start_ns":"1777358020994000000","timestamp":1777358020.994000},{"value":1.2056752E7,"elapsed_since_start_ns":"1777358021094000000","timestamp":1777358021.094000},{"value":1.211E7,"elapsed_since_start_ns":"1777358021194000000","timestamp":1777358021.193999},{"value":1.213048E7,"elapsed_since_start_ns":"1777358021294000000","timestamp":1777358021.294000},{"value":1.215096E7,"elapsed_since_start_ns":"1777358021394000000","timestamp":1777358021.393999},{"value":1.22124E7,"elapsed_since_start_ns":"1777358021494000000","timestamp":1777358021.494000},{"value":1.223288E7,"elapsed_since_start_ns":"1777358021594000000","timestamp":1777358021.593999},{"value":1.2286128E7,"elapsed_since_start_ns":"1777358021695000000","timestamp":1777358021.695000},{"value":1.2339376E7,"elapsed_since_start_ns":"1777358021794000000","timestamp":1777358021.794000},{"value":1.2359856E7,"elapsed_since_start_ns":"1777358021894000000","timestamp":1777358021.894000},{"value":1.2421296E7,"elapsed_since_start_ns":"1777358021995000000","timestamp":1777358021.995000},{"value":1.2441776E7,"elapsed_since_start_ns":"1777358022094000000","timestamp":1777358022.094000},{"value":1.2495024E7,"elapsed_since_start_ns":"1777358022194000000","timestamp":1777358022.193999},{"value":1.2515504E7,"elapsed_since_start_ns":"1777358022294000000","timestamp":1777358022.294000},{"value":1.2535984E7,"elapsed_since_start_ns":"1777358022394000000","timestamp":1777358022.393999},{"value":1.2597424E7,"elapsed_since_start_ns":"1777358022495000000","timestamp":1777358022.495000},{"value":1.2617904E7,"elapsed_since_start_ns":"1777358022594000000","timestamp":1777358022.593999},{"value":1.2892512E7,"elapsed_since_start_ns":"1777358022694000000","timestamp":1777358022.694000},{"value":1.294576E7,"elapsed_since_start_ns":"1777358022794000000","timestamp":1777358022.794000}]},"screen_frame_rates":{"unit":"hz","values":[{"value":60.000003814697266,"elapsed_since_start_ns":"71057630775779","timestamp":1777358020.888000}]}},"platform":"android","release":"io.sentry.samples.android@8.38.0+2","environment":"debug","version":"2","content_type":"perfetto","timestamp":1777358020.855000} U2N diff --git a/relay-server/src/envelope/content_type.rs b/relay-server/src/envelope/content_type.rs index 7cadac64853..527e920ad94 100644 --- a/relay-server/src/envelope/content_type.rs +++ b/relay-server/src/envelope/content_type.rs @@ -38,6 +38,8 @@ pub enum ContentType { TraceAttachment, /// `application/vnd.sentry.attachment-ref+json` AttachmentRef, + /// `application/x-perfetto-trace` + PerfettoTrace, /// All integration content types. Integration(Integration), } @@ -60,6 +62,7 @@ impl ContentType { Self::TraceMetricContainer => "application/vnd.sentry.items.trace-metric+json", Self::TraceAttachment => "application/vnd.sentry.trace-attachment", Self::AttachmentRef => "application/vnd.sentry.attachment-ref+json", + Self::PerfettoTrace => "application/x-perfetto-trace", Self::Integration(integration) => integration.as_content_type(), } } @@ -109,6 +112,8 @@ impl ContentType { || ct.eq_ignore_ascii_case("application/vnd.sentry.attachment-ref") { Some(Self::AttachmentRef) + } else if ct.eq_ignore_ascii_case(Self::PerfettoTrace.as_str()) { + Some(Self::PerfettoTrace) } else { Integration::from_content_type(ct).map(Self::Integration) } diff --git a/relay-server/src/processing/profile_chunks/mod.rs b/relay-server/src/processing/profile_chunks/mod.rs index d1c69d39a5c..812cc838410 100644 --- a/relay-server/src/processing/profile_chunks/mod.rs +++ b/relay-server/src/processing/profile_chunks/mod.rs @@ -1,14 +1,16 @@ use std::sync::Arc; +use bytes::Bytes; +use smallvec::smallvec; + use relay_profiling::ProfileType; use relay_quotas::{DataCategory, RateLimits}; use crate::Envelope; -use crate::envelope::{EnvelopeHeaders, Item, ItemType, Items}; +use crate::envelope::{ContentType, EnvelopeHeaders, Item, ItemType, Items}; use crate::managed::{Counted, Managed, ManagedEnvelope, ManagedResult as _, Quantities, Rejected}; use crate::processing::{self, Context, CountRateLimited, Forward, Output, QuotaRateLimiter}; use crate::services::outcome::{DiscardReason, Outcome}; -use smallvec::smallvec; mod filter; mod process; @@ -56,6 +58,87 @@ impl crate::managed::OutcomeError for Error { } } +/// Serialized profile chunks extracted from an envelope. +#[derive(Debug)] +pub struct SerializedProfileChunks { + /// Original envelope headers. + pub headers: EnvelopeHeaders, + /// List of serialized profile chunk items. + pub profile_chunks: Vec, +} + +impl Counted for SerializedProfileChunks { + fn quantities(&self) -> Quantities { + let mut ui = 0; + let mut backend = 0; + + for pc in &self.profile_chunks { + match pc.profile_type() { + Some(ProfileType::Ui) => ui += 1, + Some(ProfileType::Backend) => backend += 1, + None => {} + } + } + + let mut quantities = smallvec![]; + if ui > 0 { + quantities.push((DataCategory::ProfileChunkUi, ui)); + } + if backend > 0 { + quantities.push((DataCategory::ProfileChunk, backend)); + } + + quantities + } +} + +impl CountRateLimited for Managed { + type Error = Error; +} + +#[derive(Debug)] +#[cfg_attr(all(not(feature = "processing"), not(test)), expect(dead_code))] +pub struct RawProfile { + pub payload: Bytes, + pub content_type: ContentType, +} + +/// A single profile chunk after expansion. +#[derive(Debug)] +#[cfg_attr(all(not(feature = "processing"), not(test)), expect(dead_code))] +pub struct ExpandedProfileChunk { + pub payload: Bytes, + pub raw_profile: Option, + pub quantities: Quantities, +} + +impl Counted for ExpandedProfileChunk { + fn quantities(&self) -> Quantities { + self.quantities.clone() + } +} + +/// Profile chunks after expansion: all items have been parsed, validated, and +/// converted into typed representations. +#[derive(Debug)] +pub struct ExpandedProfileChunks { + pub chunks: Vec, +} + +impl Counted for ExpandedProfileChunks { + fn quantities(&self) -> Quantities { + let mut q = Quantities::new(); + for chunk in &self.chunks { + q.extend(chunk.quantities()); + } + q + } +} + +impl CountRateLimited for Managed { + type Error = Error; +} + /// A processor for profile chunks. /// /// It processes items of type: [`ItemType::ProfileChunk`]. @@ -97,31 +180,48 @@ impl processing::Processor for ProfileChunksProcessor { async fn process( &self, - mut profile_chunks: Managed, + profile_chunks: Managed, ctx: Context<'_>, ) -> Result, Rejected> { filter::feature_flag(ctx).reject(&profile_chunks)?; - process::process(&mut profile_chunks, ctx); + if !ctx.is_processing() { + let profile_chunks = self.limiter.enforce_quotas(profile_chunks, ctx).await?; + return Ok(Output::just(ProfileChunkOutput::Serialized(profile_chunks))); + } - let profile_chunks = self.limiter.enforce_quotas(profile_chunks, ctx).await?; + let expanded: Managed = process::expand(profile_chunks, ctx); + let expanded = self.limiter.enforce_quotas(expanded, ctx).await?; - Ok(Output::just(ProfileChunkOutput(profile_chunks))) + Ok(Output::just(ProfileChunkOutput::Expanded(expanded))) } } /// Output produced by [`ProfileChunksProcessor`]. #[derive(Debug)] -pub struct ProfileChunkOutput(Managed); +#[expect( + clippy::large_enum_variant, + reason = "variants are sized by Managed which wraps different pipeline stages" +)] +pub enum ProfileChunkOutput { + /// Non-processing relay: items forwarded as-is. + Serialized(Managed), + /// Processing relay: items expanded into typed representations. + Expanded(Managed), +} impl Forward for ProfileChunkOutput { fn serialize_envelope( self, _: processing::ForwardContext<'_>, ) -> Result>, Rejected<()>> { - let Self(profile_chunks) = self; - Ok(profile_chunks - .map(|pc, _| Envelope::from_parts(pc.headers, Items::from_vec(pc.profile_chunks)))) + match self { + Self::Serialized(profile_chunks) => Ok(profile_chunks + .map(|pc, _| Envelope::from_parts(pc.headers, Items::from_vec(pc.profile_chunks)))), + Self::Expanded(m) => { + Err(m.internal_error("serialize_envelope called with expanded profile chunks")) + } + } } #[cfg(feature = "processing")] @@ -130,24 +230,24 @@ impl Forward for ProfileChunkOutput { s: processing::forward::StoreHandle<'_>, ctx: processing::ForwardContext<'_>, ) -> Result<(), Rejected<()>> { - use crate::services::store::{RawProfileContentType, StoreProfileChunk}; - - let Self(profile_chunks) = self; + use crate::services::store::StoreProfileChunk; + + let expanded = match self { + Self::Expanded(e) => e, + Self::Serialized(m) => { + return Err( + m.internal_error("forward_store called with non-expanded profile chunks") + ); + } + }; let retention_days = ctx.event_retention().standard; - for item in profile_chunks.split(|pc| pc.profile_chunks) { - let (kafka_payload, raw_profile) = split_item_payload(&item); - - s.send_to_store(item.map(|item, _| StoreProfileChunk { + for chunk in expanded.split(|e| e.chunks) { + s.send_to_store(chunk.map(|chunk, _| StoreProfileChunk { retention_days, - payload: kafka_payload, - quantities: item.quantities(), - raw_profile_content_type: if raw_profile.is_some() { - Some(RawProfileContentType::Perfetto) - } else { - None - }, - raw_profile, + payload: chunk.payload, + quantities: chunk.quantities, + raw_profile: chunk.raw_profile, })); } @@ -155,149 +255,31 @@ impl Forward for ProfileChunkOutput { } } -/// Splits a profile chunk item payload into its constituent parts. -/// -/// For compound items (those with a `meta_length` header), the payload is -/// `[expanded JSON][raw binary]`. Returns `(kafka_payload, raw_profile)`. -/// -/// For plain items, returns `(full_payload, None)`. -#[cfg(any(feature = "processing", test))] -fn split_item_payload(item: &Item) -> (bytes::Bytes, Option) { - let payload = item.payload(); - - let Some(meta_length) = item.meta_length() else { - return (payload, None); - }; - - let meta_length = meta_length as usize; - let Some((meta, body)) = payload.split_at_checked(meta_length) else { - return (payload, None); - }; - - if body.is_empty() { - return (payload.slice_ref(meta), None); - } - - (payload.slice_ref(meta), Some(payload.slice_ref(body))) -} - -/// Serialized profile chunks extracted from an envelope. -#[derive(Debug)] -pub struct SerializedProfileChunks { - /// Original envelope headers. - pub headers: EnvelopeHeaders, - /// List of serialized profile chunk items. - pub profile_chunks: Vec, -} - -impl Counted for SerializedProfileChunks { - fn quantities(&self) -> Quantities { - let mut ui = 0; - let mut backend = 0; - - for pc in &self.profile_chunks { - match pc.profile_type() { - Some(ProfileType::Ui) => ui += 1, - Some(ProfileType::Backend) => backend += 1, - None => {} - } - } - - let mut quantities = smallvec![]; - if ui > 0 { - quantities.push((DataCategory::ProfileChunkUi, ui)); - } - if backend > 0 { - quantities.push((DataCategory::ProfileChunk, backend)); - } - - quantities - } -} - -impl CountRateLimited for Managed { - type Error = Error; -} - #[cfg(test)] mod tests { - use similar_asserts::assert_eq; - - use crate::envelope::ContentType; - use super::*; - - fn make_chunk_item(meta: &[u8]) -> Item { - let mut item = Item::new(ItemType::ProfileChunk); - item.set_payload(ContentType::Json, bytes::Bytes::copy_from_slice(meta)); - item - } - - fn make_compound_item(meta: &[u8], body: &[u8]) -> Item { - let meta_length = meta.len(); - let mut payload = bytes::BytesMut::with_capacity(meta_length + body.len()); - payload.extend_from_slice(meta); - payload.extend_from_slice(body); - - let mut item = Item::new(ItemType::ProfileChunk); - item.set_payload(ContentType::OctetStream, payload.freeze()); - item.set_meta_length(meta_length as u32); - item - } - - #[test] - fn test_split_plain_chunk() { - let item = make_chunk_item(b"{}"); - let (payload, raw) = split_item_payload(&item); - assert_eq!(payload.as_ref(), b"{}"); - assert!(raw.is_none()); - } - - #[test] - fn test_split_compound_chunk() { - let meta = br#"{"content_type":"perfetto"}"#; - let body = b"binary-data"; - let item = make_compound_item(meta, body); - - let (payload, raw) = split_item_payload(&item); - assert_eq!(payload.as_ref(), meta.as_ref()); - assert_eq!(raw.as_deref(), Some(b"binary-data".as_ref())); - } - - #[test] - fn test_split_compound_empty_body() { - let meta = br#"{"content_type":"perfetto"}"#; - let item = make_compound_item(meta, b""); - - let (payload, raw) = split_item_payload(&item); - assert_eq!(payload.as_ref(), meta.as_ref()); - assert!(raw.is_none()); + use crate::processing::Context; + + fn make_expanded( + chunks: Vec, + ) -> ( + Managed, + crate::managed::ManagedTestHandle, + ) { + Managed::for_test(ExpandedProfileChunks { chunks }).build() } #[test] - fn test_split_compound_meta_length_exceeds_payload() { - // meta_length is set to more bytes than the payload actually contains. - // split_at_checked returns None, so we fall back to the full payload with no split. - let body = b"binary-data"; - let mut item = Item::new(ItemType::ProfileChunk); - item.set_payload(ContentType::OctetStream, bytes::Bytes::from(body.as_ref())); - item.set_meta_length(body.len() as u32 + 100); - - let (payload, raw) = split_item_payload(&item); - assert_eq!(payload.as_ref(), body.as_ref()); - assert!(raw.is_none()); - } + #[should_panic(expected = "serialize_envelope called with expanded profile chunks")] + fn test_serialize_envelope_rejects_expanded() { + let chunk = ExpandedProfileChunk { + payload: Bytes::from(b"{\"hello\":\"world\"}".as_ref()), + raw_profile: None, + quantities: smallvec![(DataCategory::ProfileChunk, 1)], + }; + let (managed, _handle) = make_expanded(vec![chunk]); + let output = ProfileChunkOutput::Expanded(managed); - #[test] - fn test_split_compound_zero_meta_length() { - // meta_length = 0: meta slice is empty, entire payload is treated as body. - let body = b"binary-data"; - let mut item = Item::new(ItemType::ProfileChunk); - item.set_payload(ContentType::OctetStream, bytes::Bytes::from(body.as_ref())); - item.set_meta_length(0); - - let (payload, raw) = split_item_payload(&item); - assert_eq!(payload.as_ref(), b""); - assert_eq!(raw.as_deref(), Some(b"binary-data".as_ref())); + let _ = output.serialize_envelope(Context::for_test().to_forward()); } } diff --git a/relay-server/src/processing/profile_chunks/process.rs b/relay-server/src/processing/profile_chunks/process.rs index a99896b1074..576723e9da2 100644 --- a/relay-server/src/processing/profile_chunks/process.rs +++ b/relay-server/src/processing/profile_chunks/process.rs @@ -1,177 +1,158 @@ use std::net::IpAddr; +use bytes::Bytes; +use smallvec::smallvec; + use relay_dynamic_config::Feature; use relay_profiling::ProfileType; use relay_quotas::DataCategory; -use crate::envelope::{ContentType, Item, ItemType}; +use crate::envelope::ContentType; +use crate::managed::{Quantities, RecordKeeper}; use crate::processing::Context; use crate::processing::Managed; -use crate::processing::profile_chunks::{Error, Result, SerializedProfileChunks}; +use crate::processing::profile_chunks::{ + Error, ExpandedProfileChunk, ExpandedProfileChunks, RawProfile, Result, SerializedProfileChunks, +}; use crate::statsd::RelayCounters; use crate::utils; -/// Processes profile chunks. -pub fn process(profile_chunks: &mut Managed, ctx: Context<'_>) { - // Only run this 'expensive' processing step in processing Relays. - if !ctx.is_processing() { - return; - } - - let sdk = utils::client_name_tag(profile_chunks.headers.meta().client_name()); - let client_ip = profile_chunks.headers.meta().client_addr(); - let filter_settings = &ctx.project_info.config.filter_settings; - - profile_chunks.retain( - |pc| &mut pc.profile_chunks, - |item, records| -> Result<()> { - if let Some(meta_length) = item.meta_length() { - return process_compound_item( - item, - meta_length, - sdk, - client_ip, - filter_settings, - ctx, - records, - ); - } - - let pc = relay_profiling::ProfileChunk::new(item.payload())?; - - // Validate the item inferred profile type with the one from the payload, - // or if missing set it. - // - // This is currently necessary to ensure profile chunks are emitted in the correct - // data category, as well as rate limited with the correct data category. - // - // In the future we plan to make the profile type on the item header a necessity. - // For more context see also: . - if item - .profile_type() - .is_some_and(|pt| pt != pc.profile_type()) - { - return Err(relay_profiling::ProfileError::InvalidProfileType.into()); - } +/// Expands serialized profile chunk items into typed representations. +/// +/// Each item is individually parsed and validated. Items that fail are +/// removed with outcome tracking. +pub fn expand( + chunks: Managed, + ctx: Context<'_>, +) -> Managed { + chunks.map(|serialized, records| { + let sdk = utils::client_name_tag(serialized.headers.meta().client_name()); + let client_ip = serialized.headers.meta().client_addr(); + let filter_settings = &ctx.project_info.config.filter_settings; + + let mut expanded = Vec::with_capacity(serialized.profile_chunks.len()); + + for item in serialized.profile_chunks { + let payload = item.payload(); + let is_perfetto = matches!(item.content_type(), Some(ContentType::PerfettoTrace)); + + let result = if is_perfetto { + expand_perfetto_profile_chunk(&item, client_ip, filter_settings, ctx, payload) + } else { + expand_json_item(&item, client_ip, filter_settings, ctx, payload) + }; - // Update the profile type to ensure the following outcomes are emitted in the correct - // data category. - // - // Once the item header on the item is required, this is no longer required. - if item.profile_type().is_none() { - relay_statsd::metric!( - counter(RelayCounters::ProfileChunksWithoutPlatform) += 1, - sdk = sdk - ); - - item.set_platform(pc.platform().to_owned()); - debug_assert_eq!(item.profile_type(), Some(pc.profile_type())); - match pc.profile_type() { - ProfileType::Ui => records.modify_by(DataCategory::ProfileChunkUi, 1), - ProfileType::Backend => records.modify_by(DataCategory::ProfileChunk, 1), + match result { + Ok(chunk) => { + track_quantities(&item, sdk, &chunk.quantities, records); + expanded.push(chunk); + } + Err(err) => { + track_quantities(&item, sdk, &item.quantities(), records); + drop(records.reject_err(err, &item)); } } + } - pc.filter(client_ip, filter_settings, ctx.global_config)?; - - let expanded = pc.expand()?; - if expanded.len() > ctx.config.max_profile_size() { - return Err(relay_profiling::ProfileError::ExceedSizeLimit.into()); - } - - *item = { - let mut new_item = Item::new(ItemType::ProfileChunk); - new_item.set_platform(pc.platform().to_owned()); - new_item.set_payload(ContentType::Json, expanded); - new_item - }; - - Ok(()) - }, - ); + ExpandedProfileChunks { chunks: expanded } + }) } -/// Processes a compound profile chunk item (JSON metadata + binary blob). -/// -/// The item payload is `[JSON metadata bytes][binary blob bytes]`, split at `meta_length`. -/// After expansion, the item is rebuilt with `[expanded JSON][raw binary]` and an updated -/// `meta_length`, so that `forward_store` can still extract the raw profile. -fn process_compound_item( - item: &mut Item, - meta_length: u32, - sdk: &str, +fn expand_perfetto_profile_chunk( + item: &crate::envelope::Item, client_ip: Option, filter_settings: &relay_filter::ProjectFiltersConfig, ctx: Context<'_>, - records: &mut crate::managed::RecordKeeper, -) -> Result<()> { - let payload = item.payload(); - let meta_length = meta_length as usize; - - let Some((meta_json, raw_profile)) = payload.split_at_checked(meta_length) else { - return Err(relay_profiling::ProfileError::InvalidSampledProfile.into()); - }; - - #[derive(serde::Deserialize)] - struct ContentTypeProbe { - content_type: Option, - } - match serde_json::from_slice::(meta_json) - .ok() - .and_then(|v| v.content_type) - .as_deref() - { - Some("perfetto") => {} - _ => return Err(relay_profiling::ProfileError::PlatformNotSupported.into()), - } - + payload: Bytes, +) -> Result { if ctx.should_filter(Feature::ContinuousProfilingPerfetto) { return Err(Error::FilterFeatureFlag); } + if item.platform().is_none() { + return Err(relay_profiling::ProfileError::PlatformNotSupported.into()); + } - let expanded = relay_profiling::expand_perfetto(raw_profile, meta_json)?; - + let profile_type = item + .profile_type() + .ok_or(relay_profiling::ProfileError::InvalidProfileType)?; + let meta_length = + item.meta_length() + .ok_or(relay_profiling::ProfileError::InvalidSampledProfile)? as usize; + let (json_payload, perfetto_payload) = payload + .split_at_checked(meta_length) + .ok_or(relay_profiling::ProfileError::InvalidSampledProfile)?; + let expanded = relay_profiling::expand_perfetto(perfetto_payload, json_payload)?; + if expanded.profile_type() != profile_type { + return Err(relay_profiling::ProfileError::InvalidProfileType.into()); + } + expanded.filter(client_ip, filter_settings, ctx.global_config)?; if expanded.payload.len() > ctx.config.max_profile_size() { return Err(relay_profiling::ProfileError::ExceedSizeLimit.into()); } + Ok(ExpandedProfileChunk { + payload: Bytes::from(expanded.payload), + raw_profile: Some(RawProfile { + payload: payload.slice_ref(perfetto_payload), + content_type: ContentType::PerfettoTrace, + }), + quantities: quantities_for(profile_type), + }) +} - if item - .profile_type() - .is_some_and(|pt| pt != expanded.profile_type()) - { +fn expand_json_item( + item: &crate::envelope::Item, + client_ip: Option, + filter_settings: &relay_filter::ProjectFiltersConfig, + ctx: Context<'_>, + payload: Bytes, +) -> Result { + if item.meta_length().is_some() { + return Err(relay_profiling::ProfileError::InvalidSampledProfile.into()); + } + let pc = relay_profiling::ProfileChunk::new(payload)?; + let profile_type = pc.profile_type(); + validate_profile_type(item, profile_type)?; + pc.filter(client_ip, filter_settings, ctx.global_config)?; + let expanded = pc.expand()?; + if expanded.len() > ctx.config.max_profile_size() { + return Err(relay_profiling::ProfileError::ExceedSizeLimit.into()); + } + Ok(ExpandedProfileChunk { + payload: Bytes::from(expanded), + raw_profile: None, + quantities: quantities_for(profile_type), + }) +} + +fn validate_profile_type(item: &crate::envelope::Item, profile_type: ProfileType) -> Result<()> { + if item.profile_type().is_some_and(|pt| pt != profile_type) { return Err(relay_profiling::ProfileError::InvalidProfileType.into()); } + Ok(()) +} +fn track_quantities( + item: &crate::envelope::Item, + sdk: &str, + quantities: &Quantities, + records: &mut RecordKeeper<'_>, +) { if item.profile_type().is_none() { relay_statsd::metric!( counter(RelayCounters::ProfileChunksWithoutPlatform) += 1, sdk = sdk ); - match expanded.profile_type() { - ProfileType::Ui => records.modify_by(DataCategory::ProfileChunkUi, 1), - ProfileType::Backend => records.modify_by(DataCategory::ProfileChunk, 1), + for &(category, quantity) in quantities { + records.modify_by(category, quantity as isize); } } +} - expanded.filter(client_ip, filter_settings, ctx.global_config)?; - - // Rebuild the compound payload: [expanded JSON][raw binary]. - // This preserves the raw profile for downstream extraction in forward_store. - let platform = expanded.platform; - let expanded_payload = bytes::Bytes::from(expanded.payload); - let mut compound = bytes::BytesMut::with_capacity(expanded_payload.len() + raw_profile.len()); - compound.extend_from_slice(&expanded_payload); - compound.extend_from_slice(raw_profile); - - *item = { - let mut new_item = Item::new(ItemType::ProfileChunk); - new_item.set_platform(platform); - new_item.set_payload(ContentType::Json, compound.freeze()); - new_item.set_meta_length(expanded_payload.len() as u32); - new_item - }; - - Ok(()) +fn quantities_for(profile_type: ProfileType) -> Quantities { + match profile_type { + ProfileType::Ui => smallvec![(DataCategory::ProfileChunkUi, 1)], + ProfileType::Backend => smallvec![(DataCategory::ProfileChunk, 1)], + } } #[cfg(test)] @@ -182,17 +163,19 @@ mod tests { use super::*; use crate::Envelope; - use crate::envelope::ContentType; + use crate::envelope::{ContentType, Item, ItemType}; use crate::extractors::RequestMeta; - use crate::managed::Managed; - use crate::processing::Context; use crate::processing::profile_chunks::SerializedProfileChunks; + use crate::services::outcome::{DiscardReason, Outcome}; use crate::services::projects::project::ProjectInfo; const PERFETTO_FIXTURE: &[u8] = include_bytes!( "../../../../relay-profiling/tests/fixtures/android/perfetto/android.pftrace" ); + const JSON_FIXTURE: &[u8] = + include_bytes!("../../../../relay-profiling/tests/fixtures/sample/v2/valid.json"); + fn perfetto_meta() -> Vec { serde_json::json!({ "version": "2", @@ -206,14 +189,15 @@ mod tests { .into_bytes() } - fn make_compound_item(meta: &[u8], body: &[u8]) -> Item { + fn make_compound_item(meta: &[u8], body: &[u8], platform: &str) -> Item { let meta_length = meta.len() as u32; let mut payload = bytes::BytesMut::new(); payload.extend_from_slice(meta); payload.extend_from_slice(body); let mut item = Item::new(ItemType::ProfileChunk); - item.set_payload(ContentType::OctetStream, payload.freeze()); + item.set_payload(ContentType::PerfettoTrace, payload.freeze()); item.set_meta_length(meta_length); + item.set_platform(platform.to_owned()); item } @@ -235,92 +219,117 @@ mod tests { .build() } - /// Runs `process_compound_item` for the single item in `managed` and returns the - /// inner [`SerializedProfileChunks`] after processing, consuming the managed value. - fn run(managed: &mut Managed, ctx: Context<'_>) { - let sdk = ""; - let client_ip = None; - let filter_settings = Default::default(); - managed.retain( - |pc| &mut pc.profile_chunks, - |item, records| -> Result<()> { - let meta_length = item.meta_length().unwrap_or(0); - process_compound_item( - item, - meta_length, - sdk, - client_ip, - &filter_settings, - ctx, - records, - ) - }, - ); - } - #[test] - fn test_process_compound_unknown_content_type() { - // content_type is not "perfetto" → item is dropped immediately. - let meta = serde_json::json!({ - "version": "2", - "chunk_id": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", - "profiler_id": "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", - "platform": "android", - "content_type": "unknown", - "client_sdk": {"name": "sentry-android", "version": "1.0"}, - }) - .to_string() - .into_bytes(); - let item = make_compound_item(&meta, PERFETTO_FIXTURE); - let (mut managed, _handle) = make_chunks(vec![item]); - - run(&mut managed, Context::for_test()); + fn test_expand_compound_unknown_content_type() { + let meta = perfetto_meta(); + let meta_length = meta.len() as u32; + let mut payload = bytes::BytesMut::new(); + payload.extend_from_slice(&meta); + payload.extend_from_slice(PERFETTO_FIXTURE); + let mut item = Item::new(ItemType::ProfileChunk); + item.set_payload(ContentType::OctetStream, payload.freeze()); + item.set_meta_length(meta_length); + let (managed, _handle) = make_chunks(vec![item]); - let chunks = managed.accept(|c| c); - assert!(chunks.profile_chunks.is_empty(), "item should be dropped"); + let expanded = expand(managed, Context::for_test()); + let chunks = expanded.accept(|c| c); + assert!(chunks.chunks.is_empty(), "item should be dropped"); } #[test] - fn test_process_compound_feature_flag_disabled() { - // The ContinuousProfilingPerfetto feature is absent → item is dropped. - // Default Context::for_test() uses relay mode = Managed with an empty feature set. + fn test_expand_compound_feature_flag_disabled() { let meta = perfetto_meta(); - let item = make_compound_item(&meta, PERFETTO_FIXTURE); - let (mut managed, _handle) = make_chunks(vec![item]); - - run(&mut managed, Context::for_test()); + let item = make_compound_item(&meta, PERFETTO_FIXTURE, "android"); + let (managed, _handle) = make_chunks(vec![item]); - let chunks = managed.accept(|c| c); + let expanded = expand(managed, Context::for_test()); + let chunks = expanded.accept(|c| c); assert!( - chunks.profile_chunks.is_empty(), + chunks.chunks.is_empty(), "item should be dropped when feature flag is absent" ); } #[test] - fn test_process_compound_meta_length_out_of_bounds() { - // meta_length header is larger than the actual payload → InvalidSampledProfile. + fn test_expand_compound_meta_length_out_of_bounds() { let body = b"some bytes"; let mut item = Item::new(ItemType::ProfileChunk); - item.set_payload(ContentType::OctetStream, bytes::Bytes::from(body.as_ref())); + item.set_payload( + ContentType::PerfettoTrace, + bytes::Bytes::from(body.as_ref()), + ); item.set_meta_length(body.len() as u32 + 100); - let (mut managed, _handle) = make_chunks(vec![item]); + item.set_platform("android".to_owned()); + let (managed, mut handle) = make_chunks(vec![item]); - run(&mut managed, Context::for_test()); + let ctx = Context { + project_info: &ProjectInfo { + config: ProjectConfig { + features: FeatureSet::from_iter([ + Feature::ContinuousProfiling, + Feature::ContinuousProfilingPerfetto, + ]), + ..Default::default() + }, + ..Default::default() + }, + ..Context::for_test() + }; - let chunks = managed.accept(|c| c); + let expanded = expand(managed, ctx); + let chunks = expanded.accept(|c| c); assert!( - chunks.profile_chunks.is_empty(), + chunks.chunks.is_empty(), "item should be dropped on out-of-bounds meta_length" ); + handle.assert_outcome( + &Outcome::Invalid(DiscardReason::Profiling( + "profiling_invalid_sampled_profile", + )), + DataCategory::ProfileChunkUi, + 1, + ); + } + + #[test] + fn test_expand_compound_missing_platform() { + let meta = perfetto_meta(); + let meta_length = meta.len() as u32; + let mut payload = bytes::BytesMut::new(); + payload.extend_from_slice(&meta); + payload.extend_from_slice(PERFETTO_FIXTURE); + let mut item = Item::new(ItemType::ProfileChunk); + item.set_payload(ContentType::PerfettoTrace, payload.freeze()); + item.set_meta_length(meta_length); + let (managed, _handle) = make_chunks(vec![item]); + + let ctx = Context { + project_info: &ProjectInfo { + config: ProjectConfig { + features: FeatureSet::from_iter([ + Feature::ContinuousProfiling, + Feature::ContinuousProfilingPerfetto, + ]), + ..Default::default() + }, + ..Default::default() + }, + ..Context::for_test() + }; + + let expanded = expand(managed, ctx); + let chunks = expanded.accept(|c| c); + assert!( + chunks.chunks.is_empty(), + "perfetto item without platform header should be rejected" + ); } #[test] - fn test_process_compound_success() { - // Happy path: valid Perfetto trace + feature enabled → compound payload rebuilt. + fn test_expand_compound_success() { let meta = perfetto_meta(); - let item = make_compound_item(&meta, PERFETTO_FIXTURE); - let (mut managed, _handle) = make_chunks(vec![item]); + let item = make_compound_item(&meta, PERFETTO_FIXTURE, "android"); + let (managed, _handle) = make_chunks(vec![item]); let ctx = Context { project_info: &ProjectInfo { @@ -336,28 +345,78 @@ mod tests { ..Context::for_test() }; - run(&mut managed, ctx); + let expanded = expand(managed, ctx); + let chunks = expanded.accept(|c| c); + assert_eq!(chunks.chunks.len(), 1, "item should be retained"); - let mut chunks = managed.accept(|c| c); - assert_eq!(chunks.profile_chunks.len(), 1, "item should be retained"); + let chunk = &chunks.chunks[0]; + assert!( + serde_json::from_slice::(&chunk.payload).is_ok(), + "payload must be valid JSON" + ); + let raw_profile = chunk.raw_profile.as_ref().expect("expected raw_profile"); + assert_eq!(raw_profile.payload.as_ref(), PERFETTO_FIXTURE); + assert_eq!(raw_profile.content_type, ContentType::PerfettoTrace); + } - let item = chunks.profile_chunks.remove(0); + fn make_json_item(payload: &[u8]) -> Item { + let mut item = Item::new(ItemType::ProfileChunk); + item.set_payload(ContentType::Json, bytes::Bytes::from(payload.to_vec())); + item + } + + #[test] + fn test_expand_json_success() { + let item = make_json_item(JSON_FIXTURE); + let (managed, _handle) = make_chunks(vec![item]); + + let expanded = expand(managed, Context::for_test()); + let chunks = expanded.accept(|c| c); + assert_eq!(chunks.chunks.len(), 1, "item should be retained"); + + let chunk = &chunks.chunks[0]; + assert!( + serde_json::from_slice::(&chunk.payload).is_ok(), + "payload must be valid JSON" + ); + assert!( + chunk.raw_profile.is_none(), + "JSON items should not have raw_profile" + ); + } - // The rebuilt item must carry a meta_length pointing to the expanded JSON. - let meta_length = item - .meta_length() - .expect("rebuilt item must have meta_length"); - assert!(meta_length > 0); + #[test] + fn test_expand_json_with_meta_length_rejected() { + let mut item = make_json_item(JSON_FIXTURE); + item.set_meta_length(10); + let (managed, _handle) = make_chunks(vec![item]); - // The first meta_length bytes must be valid JSON (the expanded Sample v2 profile). - let payload = item.payload(); - let (json_part, raw_part) = payload.split_at(meta_length as usize); + let expanded = expand(managed, Context::for_test()); + let chunks = expanded.accept(|c| c); assert!( - serde_json::from_slice::(json_part).is_ok(), - "first meta_length bytes must be valid JSON" + chunks.chunks.is_empty(), + "JSON item with meta_length should be rejected" ); + } - // The raw binary is the original Perfetto trace preserved verbatim. - assert_eq!(raw_part, PERFETTO_FIXTURE); + #[test] + fn test_expand_json_mismatched_profile_type() { + let mut item = make_json_item(JSON_FIXTURE); + // fixture has platform "cocoa" → ProfileType::Ui, + // but "node" → ProfileType::Backend, creating a mismatch + item.set_platform("node".to_owned()); + let (managed, mut handle) = make_chunks(vec![item]); + + let expanded = expand(managed, Context::for_test()); + let chunks = expanded.accept(|c| c); + assert!( + chunks.chunks.is_empty(), + "JSON item with mismatched profile_type header should be rejected" + ); + handle.assert_outcome( + &Outcome::Invalid(DiscardReason::Profiling("profiling_invalid_profile_type")), + DataCategory::ProfileChunk, + 1, + ); } } diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 2a7bda48a58..1bf6cd7ea92 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -36,6 +36,7 @@ use relay_threading::AsyncPool; use crate::envelope::{AttachmentPlaceholder, AttachmentType, ContentType, Item, ItemType}; use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, Rejected}; use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes}; +use crate::processing::profile_chunks::RawProfile; use crate::service::ServiceError; use crate::services::global_config::GlobalConfigHandle; use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; @@ -147,14 +148,6 @@ impl Counted for StoreSpanV2 { } } -/// Content type of a raw binary profile blob sent alongside the expanded JSON payload. -#[derive(Clone, Debug, Serialize)] -#[serde(rename_all = "lowercase")] -pub enum RawProfileContentType { - /// Perfetto binary trace format. - Perfetto, -} - /// Publishes a singular profile chunk to Kafka. #[derive(Debug)] pub struct StoreProfileChunk { @@ -170,9 +163,7 @@ pub struct StoreProfileChunk { /// /// Sent alongside the expanded JSON payload because the expansion only extracts a /// minimum of information; the raw profile is preserved for further processing downstream. - pub raw_profile: Option, - /// Content type of `raw_profile`. - pub raw_profile_content_type: Option, + pub raw_profile: Option, } impl Counted for StoreProfileChunk { @@ -863,8 +854,8 @@ impl StoreService { scoping.project_id.to_string(), )]), payload: message.payload, - raw_profile: message.raw_profile, - raw_profile_content_type: message.raw_profile_content_type, + raw_profile: message.raw_profile.as_ref().map(|r| r.payload.clone()), + raw_profile_content_type: message.raw_profile.map(|r| r.content_type), }; self.produce(KafkaTopic::Profiles, KafkaMessage::ProfileChunk(message)) @@ -1708,7 +1699,7 @@ struct ProfileChunkKafkaMessage { #[serde(skip_serializing_if = "Option::is_none")] raw_profile: Option, #[serde(skip_serializing_if = "Option::is_none")] - raw_profile_content_type: Option, + raw_profile_content_type: Option, } /// An enum over all possible ingest messages. @@ -1935,6 +1926,47 @@ mod tests { use super::*; + #[test] + fn test_profile_chunk_kafka_message_without_raw_profile() { + let message = ProfileChunkKafkaMessage { + organization_id: OrganizationId::new(1), + project_id: ProjectId::new(42), + received: 1234567890, + retention_days: 90, + headers: BTreeMap::new(), + payload: Bytes::from(b"{\"profile\":true}".as_ref()), + raw_profile: None, + raw_profile_content_type: None, + }; + let json = serde_json::to_value(&message).unwrap(); + assert_eq!(json["organization_id"], 1); + assert_eq!(json["project_id"], 42); + assert!(json.get("raw_profile").is_none()); + assert!(json.get("raw_profile_content_type").is_none()); + } + + #[test] + fn test_profile_chunk_kafka_message_with_raw_profile() { + let message = ProfileChunkKafkaMessage { + organization_id: OrganizationId::new(1), + project_id: ProjectId::new(42), + received: 1234567890, + retention_days: 90, + headers: BTreeMap::new(), + payload: Bytes::from(b"{\"profile\":true}".as_ref()), + raw_profile: Some(Bytes::from(b"perfetto-binary-data".as_ref())), + raw_profile_content_type: Some(crate::envelope::ContentType::PerfettoTrace), + }; + let json = serde_json::to_value(&message).unwrap(); + assert_eq!(json["organization_id"], 1); + assert_eq!(json["project_id"], 42); + assert!(json.get("raw_profile").is_some()); + assert_eq!( + json["raw_profile_content_type"], + "application/x-perfetto-trace" + ); + } + #[test] fn disallow_outcomes() { struct TestMessage; diff --git a/tests/integration/test_profile_chunks_perfetto.py b/tests/integration/test_profile_chunks_perfetto.py index 6bc25384e90..f2a99b7adc7 100644 --- a/tests/integration/test_profile_chunks_perfetto.py +++ b/tests/integration/test_profile_chunks_perfetto.py @@ -102,3 +102,7 @@ def test_perfetto_profile_chunk_end_to_end( for tid, meta in thread_metadata.items(): assert isinstance(tid, str) assert "name" in meta and isinstance(meta["name"], str) + + assert "raw_profile" in profile, "expected raw_profile in Kafka message" + assert len(profile["raw_profile"]) == 97252, "raw_profile size mismatch" + assert profile.get("raw_profile_content_type") == "application/x-perfetto-trace"