diff --git a/server/cmd/api/api/api.go b/server/cmd/api/api/api.go index 6e5b4440..4bfc0d82 100644 --- a/server/cmd/api/api/api.go +++ b/server/cmd/api/api/api.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "log/slog" "os" "os/exec" "sync" @@ -20,6 +21,14 @@ import ( "github.com/kernel/kernel-images/server/lib/scaletozero" ) +type cdpMonitorController interface { + Start(ctx context.Context) error + Stop() + IsRunning() bool +} + +var _ cdpMonitorController = (*cdpmonitor.Monitor)(nil) + type ApiService struct { // defaultRecorderID is used whenever the caller doesn't specify an explicit ID. defaultRecorderID string @@ -73,7 +82,7 @@ type ApiService struct { // CDP event pipeline and cdpMonitor. captureSession *events.CaptureSession - cdpMonitor *cdpmonitor.Monitor + cdpMonitor cdpMonitorController monitorMu sync.Mutex lifecycleCtx context.Context lifecycleCancel context.CancelFunc @@ -103,7 +112,7 @@ func New( return nil, fmt.Errorf("captureSession cannot be nil") } - mon := cdpmonitor.New(upstreamMgr, captureSession.Publish, displayNum) + mon := cdpmonitor.New(upstreamMgr, captureSession.Publish, displayNum, slog.Default()) ctx, cancel := context.WithCancel(context.Background()) return &ApiService{ diff --git a/server/cmd/api/api/capture_session_test.go b/server/cmd/api/api/capture_session_test.go index bcd1f168..e6edd971 100644 --- a/server/cmd/api/api/capture_session_test.go +++ b/server/cmd/api/api/capture_session_test.go @@ -248,5 +248,12 @@ func newTestService(t *testing.T, mgr recorder.RecordManager) *ApiService { t.Helper() svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newCaptureSession(t), 0) require.NoError(t, err) + svc.cdpMonitor = &stubCdpMonitor{} return svc } + +type stubCdpMonitor struct{} + +func (s *stubCdpMonitor) Start(_ context.Context) error { return nil } +func (s *stubCdpMonitor) Stop() {} +func (s *stubCdpMonitor) IsRunning() bool { return false } diff --git a/server/lib/cdpmonitor/README.md b/server/lib/cdpmonitor/README.md new file mode 100644 index 00000000..0bf194f3 --- /dev/null +++ b/server/lib/cdpmonitor/README.md @@ -0,0 +1,221 @@ +# CDP Monitor + +The monitor is the browser-facing layer of the kernel browser logging pipeline. It connects to Chrome's DevTools endpoint, tracks all page sessions via CDP's `Target.setAutoAttach`, and converts raw CDP notifications into typed `events.Event` values for downstream consumers. + +## Overview + +`cdpmonitor` manages a Chrome DevTools Protocol (CDP) WebSocket connection to a running Chrome browser. It subscribes to CDP events across all attached tabs, translates them into structured `events.Event` values, and publishes them via a caller-supplied `PublishFunc`. It also derives synthetic events from sequences of CDP events and takes screenshots on significant page activity. + +Chrome can restart independently of the monitor. When that happens, `UpstreamProvider` pushes a new DevTools URL and the monitor reconnects automatically, emitting lifecycle events so consumers can track continuity. + +## Event taxonomy + +**CDP-derived** (1-to-1 with a CDP notification): `console_log`, `console_error`, `network_request`, `network_response`, `network_loading_failed`, `page_tab_opened`, `page_navigation`, `page_dom_content_loaded`, `page_load`, `page_layout_shift` + +**Computed** (inferred from sequences of CDP events): `network_idle` (fires when in-flight requests drop to zero), `page_layout_settled` (1 s after `page_load` with no intervening layout shifts), `page_navigation_settled` (fires once `page_dom_content_loaded`, `network_idle`, and `page_layout_settled` have all fired for the same navigation). + +**Interaction** (fired by `interaction.js` via `Runtime.bindingCalled`): `interaction_click`, `interaction_key`, `interaction_scroll_settled` + +**Monitor lifecycle** (emitted by the monitor itself, not by Chrome): `monitor_screenshot`, `monitor_disconnected`, `monitor_reconnected`, `monitor_reconnect_failed`, `monitor_init_failed` + +## Responsibilities + +| Concern | Where | +| --- | --- | +| WebSocket lifecycle (connect, read, reconnect) | `monitor.go` | +| CDP domain setup per session | `domains.go` | +| Event translation (CDP params to `events.Event`) | `handlers.go` | +| Synthetic event state machines | `computed.go` | +| Screenshot capture via ffmpeg | `screenshot.go` | +| CDP protocol types | `cdp_proto.go`, `types.go` | +| Interaction tracking injected into the page | `interaction.js` | +| Body/MIME capture sizing and text truncation helpers | `util.go` | + +## Internals + +### Reconnect model + +`subscribeToUpstream` listens to `UpstreamProvider.Subscribe()` for new DevTools URLs. On each URL change (indicating Chrome restarted), `handleUpstreamRestart` tears down the existing connection, dials the new URL with capped-exponential backoff (250 ms → 500 ms → 1 s → 2 s, up to 10 attempts), then restarts `readLoop` and re-initializes all CDP sessions. `restartMu` serializes concurrent restart signals so rapid Chrome restarts do not produce overlapping reconnects. + +### Goroutines + +| Goroutine | Lifetime | Tracked by | +| --- | --- | --- | +| `readLoop` | one per WebSocket connection | `done` channel | +| `subscribeToUpstream` | same as `lifecycleCtx` | `asyncWg` | +| `sweepPendingRequests` | same as `lifecycleCtx` | `asyncWg` | +| `initSession` | short-lived, one per connect or reconnect | `asyncWg` | +| `attachExistingTargets` wrapper | short-lived, one per existing target on reconnect | `asyncWg` | +| `enableDomains` + `injectScript` | short-lived, one per target attach | `asyncWg` | +| `fetchResponseBody` | one per completed network request | `asyncWg` | +| `captureScreenshot` | one per screenshot trigger | `asyncWg` | + +`Stop()` cancels `lifecycleCtx`, waits for `readLoop` via `done`, then waits for all other goroutines via `asyncWg` before closing the connection. + +### Lock ordering + +Locks must be acquired left to right. Never hold a lock on the left while acquiring one further right. + +``` +restartMu -> lifeMu -> pendReqMu -> computed.mu -> pendMu +restartMu -> lifeMu -> sessionsMu +``` + +`computed.mu` and `sessionsMu` are never held simultaneously; `cs.stop()` and `cs.resetOnNavigation()` are called only after the relevant `sessionsMu` critical section is complete. + +`bindingRateMu` is independent of this ordering and is always acquired alone. + +| Lock | Protects | +| --- | --- | +| `restartMu` | Serializes `handleUpstreamRestart` to prevent overlapping reconnects from rapid Chrome restarts | +| `lifeMu` | `conn`, `lifecycleCtx`, `cancel`, `done`, `readReady` -- all fields that change during Start / Stop / reconnect | +| `pendReqMu` | `pendingRequests` (requestId -> `networkReqState`): in-flight network requests accumulating request/response metadata until `loadingFinished` | +| `computed.mu` | All `computedState` fields: counters and timers for the `network_idle`, `page_layout_settled`, and `page_navigation_settled` state machines | +| `pendMu` | `pending` (id -> reply channel): in-flight CDP commands waiting for a response from Chrome | +| `sessionsMu` | `sessions` (sessionID -> `targetInfo`): the set of currently attached CDP targets (tabs, iframes, workers) | +| `bindingRateMu` | `bindingLastSeen` (sessionID:eventType -> time): rate-limit state for `__kernelEvent` binding calls | + +Fields that need no mutex use `sync/atomic`: `nextID`, `mainSessionID`, `running`, `lastScreenshotAt`, `screenshotInFlight`. + +### WebSocket concurrency + +`coder/websocket` guarantees one concurrent `Read` and one concurrent `Write` are safe on the same connection. `readLoop` is the sole reader. All writes go through `send`, which calls `conn.Write` directly -- `conn.Write` is internally serialized by the library, so no external write mutex is needed. + +## Event data model + +### Envelope and top-level fields + +Every event arrives as an `Envelope`: + +```json +{ + "capture_session_id": "cs_abc123", + "seq": 42, + "event": { + "ts": 1746123456789000, + "type": "network_request", + "category": "network", + "source": { ... }, + "data": { ... }, + "truncated": false + } +} +``` + +| Field | Type | Description | +| --- | --- | --- | +| `capture_session_id` | string | Pipeline-assigned ID for the capture session (not a CDP concept). | +| `seq` | uint64 | Monotonically increasing per-capture-session sequence number. | +| `event.ts` | int64 | Wall-clock time the monitor emitted the event, as **Unix microseconds** (µs since epoch). | +| `event.type` | string | See [Event taxonomy](#event-taxonomy). | +| `event.category` | string | One of: `console`, `network`, `page`, `interaction`, `system`. | +| `event.truncated` | bool | `true` if `data` was nulled to fit the 1 MB pipeline limit. | + +### Source object + +```json +"source": { + "kind": "cdp", + "event": "Network.requestWillBeSent", + "metadata": { + "cdp_session_id": "...", + "target_id": "...", + "target_type": "page" + } +} +``` + +| Field | Description | +| --- | --- | +| `event` | The raw CDP method that triggered the event (e.g. `Network.requestWillBeSent`). Empty for computed events. | +| `metadata.cdp_session_id` | The CDP WebSocket session multiplexer ID for this target. Changes if Chrome restarts. | +| `metadata.target_id` | Stable identifier for the browser target (tab/window). Survives navigations within the same tab. | +| `metadata.target_type` | Target type as reported by Chrome: `page`, `iframe`, `worker`, etc. | + +### CDP identity primer + +Five IDs appear across events. Understanding how they nest prevents confusion: + +``` +target_id <- one per tab/window; stable across navigations +└── cdp_session_id <- WebSocket multiplexer channel to that target; resets on Chrome restart + └── frame_id <- one per frame (top-level or iframe); changes on navigation + └── loader_id <- one per document load; links a navigation to its network requests + └── request_id <- one per request (stable across redirects in a chain) +``` + +| ID | Where it appears | What it identifies | +| --- | --- | --- | +| `target_id` | `source.metadata`, most `data` objects | The browser tab. Use this to group all events from one tab session. | +| `cdp_session_id` | `source.metadata` | The WebSocket sub-channel. Not stable across reconnects. | +| `frame_id` | `page_navigation`, `network_request`, `network_response`, `network_loading_failed` | The frame the request or navigation belongs to. Top-level frame has no `parent_frame_id`. | +| `source_frame_id` | `page_layout_shift` | The frame where the layout shift occurred. Distinct from the nav context `frame_id`, which is always the top-level navigated frame. | +| `loader_id` | `page_navigation`, `network_request`, `network_response` | The document load that owns a request. Join `network_request.loader_id` to `page_navigation.loader_id` to correlate requests with the navigation that triggered them. | +| `request_id` | `network_request`, `network_response`, `network_loading_failed` | A single request chain (including redirects). Links request to its eventual response or failure. | + +### Navigation context fields + +Most event `data` objects include a nav context block stamped at the last `page_navigation`. These fields reflect the top-level frame most recently navigated in the session: + +| Field | Description | +| --- | --- | +| `session_id` | Same as `source.metadata.cdp_session_id`. Repeated for data-only consumers. | +| `frame_id` | Frame ID of the navigated top-level frame. | +| `loader_id` | Loader ID of the current document. | +| `url` | URL of the current page at the time of the last navigation. | +| `nav_seq` | Monotonically increasing counter, incremented on each `page_navigation`. Use it to detect that the page has navigated between two events in the same session. | + +### Per-event data fields + +Fields below are the unique additions per event type. Unless otherwise noted, events also include the nav context fields described above. Network events are the exception: they carry their own `loader_id` and `frame_id` directly and do not include nav context. + +#### Console events + +| Event | Unique fields | +| --- | --- | +| `console_log` | `level` (CDP type string), `text` (first arg), `args` (all args as strings), `stack_trace` | +| `console_error` | Same as `console_log` when `source.event` is `Runtime.consoleAPICalled`. When `source.event` is `Runtime.exceptionThrown`: `text`, `line`, `column`, `source_url` (script file URL, not page URL), `stack_trace`. | + +#### Network events + +| Event | Fields | +| --- | --- | +| `network_request` | `request_id`, `loader_id`, `frame_id`, `document_url`, `method`, `url`, `headers`, `initiator_type`. Optional: `post_data`, `resource_type`, `is_redirect` + `redirect_url`. | +| `network_response` | `request_id`, `loader_id`, `frame_id`, `method`, `url`, `status`, `headers`. Optional: `status_text`, `mime_type`, `resource_type`, `body` (truncated text body for textual MIME types). | +| `network_loading_failed` | `request_id`, `error_text`, `canceled`. Optional (absent when the request record was not found): `url`, `loader_id`, `frame_id`, `resource_type`. | + +#### Page events + +| Event | Unique fields | +| --- | --- | +| `page_tab_opened` | `target_id`, `target_type`, `url`, `opener_id`, `title`. Emitted before the first navigation; no nav context. | +| `page_navigation` | `session_id`, `target_id`, `target_type`, `url`, `frame_id`, `parent_frame_id` (absent for top-level frames), `loader_id`. This event establishes the nav context stamped on all subsequent events for the session. | +| `page_dom_content_loaded` | Nav context + `cdp_timestamp` (CDP monotonic seconds; not a wall-clock timestamp -- use `event.ts` for ordering). | +| `page_load` | Nav context + `cdp_timestamp` (CDP monotonic seconds). | +| `page_layout_shift` | Nav context + `source_frame_id`, `time`, `duration`. Optional `layout_shift_details` object: `score`, `had_recent_input`. Optional `lcp_details` object: `render_time`, `load_time`, `size`, `element_id`, `url`, `node_id`. Chrome multiplexes LCP candidate data through the same `PerformanceTimeline.timelineEventAdded` notification, so both may appear on a single event. | + +#### Computed events + +`network_idle`, `page_layout_settled`, and `page_navigation_settled` carry nav context fields only. + +#### Interaction events + +All interaction events include nav context plus the fields below. + +| Event | Unique fields | +| --- | --- | +| `interaction_click` | `x`, `y` (viewport coords), `selector` (CSS selector of clicked element), `tag`, `text` (element text; empty for sensitive inputs). | +| `interaction_key` | `key` (key name), `selector`, `tag`. Not emitted for sensitive input fields. | +| `interaction_scroll_settled` | `from_x`, `from_y`, `to_x`, `to_y` (scroll positions in px), `target_selector`. | + +#### Monitor lifecycle events + +Lifecycle events use `source.kind = "local_process"` and carry no nav context, except `monitor_screenshot` which includes nav context alongside the image payload. + +| Event | Fields | +| --- | --- | +| `monitor_screenshot` | Nav context + `png` (base64-encoded PNG). | +| `monitor_disconnected` | `reason: "chrome_restarted"`. | +| `monitor_reconnected` | `reconnect_duration_ms`. | +| `monitor_reconnect_failed` | `reason: "reconnect_exhausted"`. | +| `monitor_init_failed` | `step` (name of the init step that failed, e.g. `"Target.setAutoAttach"`). | \ No newline at end of file diff --git a/server/lib/cdpmonitor/cdp_proto.go b/server/lib/cdpmonitor/cdp_proto.go new file mode 100644 index 00000000..61f4282e --- /dev/null +++ b/server/lib/cdpmonitor/cdp_proto.go @@ -0,0 +1,286 @@ +package cdpmonitor + +// Layer-1 PDL-faithful CDP types — one struct per dispatched event, retaining +// every top-level field the PDL declares. Layer-2 projection lives in +// handlers.go. +// +// Invariants: +// - Every PDL field for a handled event appears here. Missing fields fail the +// round-trip test in cdp_proto_test.go. +// - omitempty on PDL-optional fields, omitted on PDL-required fields. A +// required false/0/"" must round-trip, not vanish. +// - Complex sub-objects (StackTrace, Initiator, ResourceTiming, etc.) stay as +// json.RawMessage — retained, not typed. Callers unmarshal when needed. +// - Network.Headers stays raw: PDL says string→string but some Chromium +// builds emit non-string values. +// +// Naming: cdp, unexported. The cdp prefix avoids collisions +// with stdlib and the events package. +// +// PDL source: https://chromedevtools.github.io/devtools-protocol/tot/ +// Written against Chrome M146 (ChromeDriver 146.0.7680.165). + +import "encoding/json" + +// --- Runtime domain --- + +// cdpRuntimeRemoteObject mirrors Runtime.RemoteObject. +type cdpRuntimeRemoteObject struct { + Type string `json:"type"` + Subtype string `json:"subtype,omitempty"` + ClassName string `json:"className,omitempty"` + Value json.RawMessage `json:"value,omitempty"` + UnserializableValue string `json:"unserializableValue,omitempty"` + Description string `json:"description,omitempty"` + DeepSerializedValue json.RawMessage `json:"deepSerializedValue,omitempty"` + ObjectID string `json:"objectId,omitempty"` + Preview json.RawMessage `json:"preview,omitempty"` + CustomPreview json.RawMessage `json:"customPreview,omitempty"` +} + +// cdpRuntimeConsoleAPICalledParams mirrors Runtime.consoleAPICalled params. +type cdpRuntimeConsoleAPICalledParams struct { + Type string `json:"type"` + Args []cdpRuntimeRemoteObject `json:"args"` + ExecutionContextID int `json:"executionContextId"` + Timestamp float64 `json:"timestamp"` + StackTrace json.RawMessage `json:"stackTrace,omitempty"` + Context string `json:"context,omitempty"` +} + +// cdpRuntimeExceptionDetails mirrors Runtime.ExceptionDetails. +// Exception is kept as json.RawMessage to avoid coupling to the RemoteObject +// surface used only inside exception payloads. +type cdpRuntimeExceptionDetails struct { + ExceptionID int `json:"exceptionId"` + Text string `json:"text"` + LineNumber int `json:"lineNumber"` + ColumnNumber int `json:"columnNumber"` + ScriptID string `json:"scriptId,omitempty"` + URL string `json:"url,omitempty"` + StackTrace json.RawMessage `json:"stackTrace,omitempty"` + Exception json.RawMessage `json:"exception,omitempty"` + ExecutionContextID int `json:"executionContextId,omitempty"` + ExceptionMetaData json.RawMessage `json:"exceptionMetaData,omitempty"` +} + +// cdpRuntimeExceptionThrownParams mirrors Runtime.exceptionThrown params. +type cdpRuntimeExceptionThrownParams struct { + Timestamp float64 `json:"timestamp"` + ExceptionDetails cdpRuntimeExceptionDetails `json:"exceptionDetails"` +} + +// cdpRuntimeBindingCalledParams mirrors Runtime.bindingCalled params. +type cdpRuntimeBindingCalledParams struct { + Name string `json:"name"` + Payload string `json:"payload"` + ExecutionContextID int `json:"executionContextId"` +} + +// --- Network domain --- + +// cdpNetworkRequest mirrors Network.Request. +type cdpNetworkRequest struct { + URL string `json:"url"` + URLFragment string `json:"urlFragment,omitempty"` + Method string `json:"method"` + Headers json.RawMessage `json:"headers"` + PostData string `json:"postData,omitempty"` + HasPostData bool `json:"hasPostData,omitempty"` + PostDataEntries json.RawMessage `json:"postDataEntries,omitempty"` + MixedContentType string `json:"mixedContentType,omitempty"` + InitialPriority string `json:"initialPriority"` + ReferrerPolicy string `json:"referrerPolicy,omitempty"` + IsLinkPreload bool `json:"isLinkPreload,omitempty"` + TrustTokenParams json.RawMessage `json:"trustTokenParams,omitempty"` + IsSameSite bool `json:"isSameSite,omitempty"` + IsAdRelated bool `json:"isAdRelated,omitempty"` +} + +// cdpNetworkResponse mirrors Network.Response. +// +// Bool fields marked required by PDL (connectionReused, fromDiskCache, +// fromServiceWorker, fromPrefetchCache) must not have omitempty: Chrome sends +// them verbatim as true or false and a false→absent coercion would break +// round-trip fidelity and mislead consumers. +type cdpNetworkResponse struct { + URL string `json:"url"` + Status int `json:"status"` + StatusText string `json:"statusText"` + Headers json.RawMessage `json:"headers"` + HeadersText string `json:"headersText,omitempty"` + MimeType string `json:"mimeType"` + Charset string `json:"charset,omitempty"` + RequestHeaders json.RawMessage `json:"requestHeaders,omitempty"` + RequestHeadersText string `json:"requestHeadersText,omitempty"` + ConnectionReused bool `json:"connectionReused"` + ConnectionID float64 `json:"connectionId"` + RemoteIPAddress string `json:"remoteIPAddress,omitempty"` + RemotePort int `json:"remotePort,omitempty"` + FromDiskCache bool `json:"fromDiskCache"` + FromServiceWorker bool `json:"fromServiceWorker"` + FromPrefetchCache bool `json:"fromPrefetchCache"` + FromEarlyHints bool `json:"fromEarlyHints,omitempty"` + ServiceWorkerRouterInfo json.RawMessage `json:"serviceWorkerRouterInfo,omitempty"` + EncodedDataLength float64 `json:"encodedDataLength"` + Timing json.RawMessage `json:"timing,omitempty"` + ServiceWorkerResponseSource string `json:"serviceWorkerResponseSource,omitempty"` + ResponseTime float64 `json:"responseTime,omitempty"` + CacheStorageCacheName string `json:"cacheStorageCacheName,omitempty"` + Protocol string `json:"protocol,omitempty"` + AlternateProtocolUsage string `json:"alternateProtocolUsage,omitempty"` + SecurityState string `json:"securityState"` + SecurityDetails json.RawMessage `json:"securityDetails,omitempty"` +} + +// cdpNetworkRequestWillBeSentParams mirrors Network.requestWillBeSent params. +// Type (ResourceType) is PDL-required — the old projection used the wrong wire +// key "resourceType" which never matched real Chrome output. +type cdpNetworkRequestWillBeSentParams struct { + RequestID string `json:"requestId"` + LoaderID string `json:"loaderId"` + DocumentURL string `json:"documentURL"` + Request cdpNetworkRequest `json:"request"` + Timestamp float64 `json:"timestamp"` + WallTime float64 `json:"wallTime"` + Initiator json.RawMessage `json:"initiator"` + RedirectHasExtraInfo bool `json:"redirectHasExtraInfo,omitempty"` + RedirectResponse json.RawMessage `json:"redirectResponse,omitempty"` + Type string `json:"type"` + FrameID string `json:"frameId,omitempty"` + HasUserGesture bool `json:"hasUserGesture,omitempty"` + RenderBlockingBehavior string `json:"renderBlockingBehavior,omitempty"` +} + +// cdpNetworkResponseReceivedParams mirrors Network.responseReceived params. +type cdpNetworkResponseReceivedParams struct { + RequestID string `json:"requestId"` + LoaderID string `json:"loaderId"` + Timestamp float64 `json:"timestamp"` + Type string `json:"type"` + Response cdpNetworkResponse `json:"response"` + HasExtraInfo bool `json:"hasExtraInfo,omitempty"` + FrameID string `json:"frameId,omitempty"` +} + +// cdpNetworkLoadingFinishedParams mirrors Network.loadingFinished params. +type cdpNetworkLoadingFinishedParams struct { + RequestID string `json:"requestId"` + Timestamp float64 `json:"timestamp"` + EncodedDataLength float64 `json:"encodedDataLength"` +} + +// cdpNetworkLoadingFailedParams mirrors Network.loadingFailed params. +type cdpNetworkLoadingFailedParams struct { + RequestID string `json:"requestId"` + Timestamp float64 `json:"timestamp"` + Type string `json:"type"` + ErrorText string `json:"errorText"` + Canceled bool `json:"canceled,omitempty"` + BlockedReason string `json:"blockedReason,omitempty"` + CorsErrorStatus json.RawMessage `json:"corsErrorStatus,omitempty"` +} + +// --- Page domain --- + +// cdpPageFrame mirrors Page.Frame. +type cdpPageFrame struct { + ID string `json:"id"` + ParentID string `json:"parentId,omitempty"` + LoaderID string `json:"loaderId"` + Name string `json:"name,omitempty"` + URL string `json:"url"` + URLFragment string `json:"urlFragment,omitempty"` + DomainAndRegistry string `json:"domainAndRegistry,omitempty"` + SecurityOrigin string `json:"securityOrigin"` + SecurityOriginDetails json.RawMessage `json:"securityOriginDetails,omitempty"` + MimeType string `json:"mimeType"` + UnreachableURL string `json:"unreachableUrl,omitempty"` + AdFrameStatus json.RawMessage `json:"adFrameStatus,omitempty"` + SecureContextType string `json:"secureContextType,omitempty"` + CrossOriginIsolatedContextType string `json:"crossOriginIsolatedContextType,omitempty"` + GatedAPIFeatures json.RawMessage `json:"gatedAPIFeatures,omitempty"` +} + +// cdpPageFrameNavigatedParams mirrors Page.frameNavigated params. +type cdpPageFrameNavigatedParams struct { + Frame cdpPageFrame `json:"frame"` + Type string `json:"type,omitempty"` +} + +// cdpPageDomContentEventFiredParams mirrors Page.domContentEventFired params. +type cdpPageDomContentEventFiredParams struct { + Timestamp float64 `json:"timestamp"` +} + +// cdpPageLoadEventFiredParams mirrors Page.loadEventFired params. +type cdpPageLoadEventFiredParams struct { + Timestamp float64 `json:"timestamp"` +} + +// --- PerformanceTimeline domain --- + +// cdpPerformanceTimelineEvent mirrors PerformanceTimeline.TimelineEvent. +// Only one of lcpDetails / layoutShiftDetails is populated per event, +// depending on Type. +type cdpPerformanceTimelineEvent struct { + FrameID string `json:"frameId,omitempty"` + Type string `json:"type"` + Name string `json:"name,omitempty"` + Time float64 `json:"time"` + Duration float64 `json:"duration,omitempty"` + LcpDetails json.RawMessage `json:"lcpDetails,omitempty"` + LayoutShiftDetails json.RawMessage `json:"layoutShiftDetails,omitempty"` +} + +// cdpPerformanceTimelineEventAddedParams mirrors +// PerformanceTimeline.timelineEventAdded params. +type cdpPerformanceTimelineEventAddedParams struct { + Event cdpPerformanceTimelineEvent `json:"event"` +} + +// cdpLayoutShiftDetails mirrors PerformanceTimeline.LayoutShiftDetails (PDL wire format). +type cdpLayoutShiftDetails struct { + Score float64 `json:"score"` + HadRecentInput bool `json:"hadRecentInput"` +} + +// cdpLcpDetails mirrors PerformanceTimeline.LargestContentfulPaintDetails (PDL wire format). +type cdpLcpDetails struct { + RenderTime float64 `json:"renderTime"` + LoadTime float64 `json:"loadTime"` + Size float64 `json:"size"` + ElementID string `json:"elementId,omitempty"` + URL string `json:"url,omitempty"` + NodeID int `json:"nodeId,omitempty"` +} + +// --- Target domain --- + +// cdpTargetTargetInfo mirrors Target.TargetInfo. +type cdpTargetTargetInfo struct { + TargetID string `json:"targetId"` + Type string `json:"type"` + Title string `json:"title"` + URL string `json:"url"` + Attached bool `json:"attached"` + OpenerID string `json:"openerId,omitempty"` + CanAccessOpener bool `json:"canAccessOpener,omitempty"` + OpenerFrameID string `json:"openerFrameId,omitempty"` + ParentFrameID string `json:"parentFrameId,omitempty"` + BrowserContextID string `json:"browserContextId,omitempty"` + Subtype string `json:"subtype,omitempty"` +} + +// cdpTargetAttachedToTargetParams mirrors Target.attachedToTarget params. +type cdpTargetAttachedToTargetParams struct { + SessionID string `json:"sessionId"` + TargetInfo cdpTargetTargetInfo `json:"targetInfo"` + WaitingForDebugger bool `json:"waitingForDebugger"` +} + +// cdpTargetDetachedFromTargetParams mirrors Target.detachedFromTarget params. +type cdpTargetDetachedFromTargetParams struct { + SessionID string `json:"sessionId"` + TargetID string `json:"targetId,omitempty"` +} diff --git a/server/lib/cdpmonitor/cdp_proto_test.go b/server/lib/cdpmonitor/cdp_proto_test.go new file mode 100644 index 00000000..7a9d0490 --- /dev/null +++ b/server/lib/cdpmonitor/cdp_proto_test.go @@ -0,0 +1,58 @@ +package cdpmonitor + +import ( + "encoding/json" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" +) + +// TestLayer1Roundtrip verifies that each Layer-1 struct retains every field +// from a captured real-Chrome JSON frame. For each type we decode a fixture +// into the struct, re-marshal the struct, and JSON-compare the two. Any +// non-equivalence is an audit failure — a PDL field has been forgotten, +// silently type-coerced, or mis-tagged. +// +// Fixtures deliberately use non-zero values for all bool/numeric fields +// marked with omitempty so that the round-trip actually exercises retention. +// Chrome never distinguishes "absent" from "zero" on the wire, so omitempty +// is correct; the audit cares about fields with real data being preserved. +func TestLayer1Roundtrip(t *testing.T) { + cases := []struct { + name string + fixture string + target func() any + }{ + {"Runtime.consoleAPICalled", "Runtime_consoleAPICalled.json", func() any { return new(cdpRuntimeConsoleAPICalledParams) }}, + {"Runtime.exceptionThrown", "Runtime_exceptionThrown.json", func() any { return new(cdpRuntimeExceptionThrownParams) }}, + {"Runtime.bindingCalled", "Runtime_bindingCalled.json", func() any { return new(cdpRuntimeBindingCalledParams) }}, + {"Network.requestWillBeSent", "Network_requestWillBeSent.json", func() any { return new(cdpNetworkRequestWillBeSentParams) }}, + {"Network.responseReceived", "Network_responseReceived.json", func() any { return new(cdpNetworkResponseReceivedParams) }}, + {"Network.loadingFinished", "Network_loadingFinished.json", func() any { return new(cdpNetworkLoadingFinishedParams) }}, + {"Network.loadingFailed", "Network_loadingFailed.json", func() any { return new(cdpNetworkLoadingFailedParams) }}, + {"Page.frameNavigated", "Page_frameNavigated.json", func() any { return new(cdpPageFrameNavigatedParams) }}, + {"Page.domContentEventFired", "Page_domContentEventFired.json", func() any { return new(cdpPageDomContentEventFiredParams) }}, + {"Page.loadEventFired", "Page_loadEventFired.json", func() any { return new(cdpPageLoadEventFiredParams) }}, + {"PerformanceTimeline.timelineEventAdded", "PerformanceTimeline_timelineEventAdded.json", func() any { return new(cdpPerformanceTimelineEventAddedParams) }}, + {"Target.attachedToTarget", "Target_attachedToTarget.json", func() any { return new(cdpTargetAttachedToTargetParams) }}, + {"Target.detachedFromTarget", "Target_detachedFromTarget.json", func() any { return new(cdpTargetDetachedFromTargetParams) }}, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + raw, err := os.ReadFile(filepath.Join("testdata", tc.fixture)) + require.NoError(t, err, "read fixture %s", tc.fixture) + + dst := tc.target() + require.NoError(t, json.Unmarshal(raw, dst), "unmarshal fixture into Layer-1 struct") + + reMarshaled, err := json.Marshal(dst) + require.NoError(t, err, "re-marshal Layer-1 struct") + + require.JSONEq(t, string(raw), string(reMarshaled), + "Layer-1 struct dropped or mis-typed a field — diff fixture vs. re-marshaled output to find the missing field") + }) + } +} diff --git a/server/lib/cdpmonitor/cdp_test.go b/server/lib/cdpmonitor/cdp_test.go new file mode 100644 index 00000000..91ade37e --- /dev/null +++ b/server/lib/cdpmonitor/cdp_test.go @@ -0,0 +1,380 @@ +package cdpmonitor + +import ( + "bytes" + "context" + "encoding/json" + "image" + "image/color" + "image/png" + "io" + "log/slog" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + "time" + + "github.com/coder/websocket" + "github.com/coder/websocket/wsjson" + "github.com/kernel/kernel-images/server/lib/events" + "github.com/stretchr/testify/require" +) + +var discardLogger = slog.New(slog.NewTextHandler(io.Discard, nil)) + +// minimalPNG is a valid 1x1 PNG used as a test fixture for screenshot tests. +var minimalPNG = func() []byte { + img := image.NewRGBA(image.Rect(0, 0, 1, 1)) + img.Set(0, 0, color.RGBA{R: 255, G: 0, B: 0, A: 255}) + var buf bytes.Buffer + _ = png.Encode(&buf, img) + return buf.Bytes() +}() + +// testServer is a minimal WebSocket server that accepts connections and +// lets the test drive scripted message sequences. +type testServer struct { + srv *httptest.Server + conn *websocket.Conn + connMu sync.Mutex + connCh chan struct{} // closed when the first connection is accepted + msgCh chan []byte // inbound messages from Monitor +} + +func newTestServer(t *testing.T) *testServer { + t.Helper() + s := &testServer{ + msgCh: make(chan []byte, 128), + connCh: make(chan struct{}), + } + var connOnce sync.Once + s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + c, err := websocket.Accept(w, r, &websocket.AcceptOptions{InsecureSkipVerify: true}) + if err != nil { + return + } + s.connMu.Lock() + s.conn = c + s.connMu.Unlock() + connOnce.Do(func() { close(s.connCh) }) + go func() { + for { + _, b, err := c.Read(context.Background()) + if err != nil { + return + } + s.msgCh <- b + } + }() + })) + return s +} + +func (s *testServer) wsURL() string { + return "ws" + strings.TrimPrefix(s.srv.URL, "http") +} + +func (s *testServer) sendToMonitor(t *testing.T, msg any) { + t.Helper() + s.connMu.Lock() + c := s.conn + s.connMu.Unlock() + require.NotNil(t, c, "no active connection") + require.NoError(t, wsjson.Write(context.Background(), c, msg)) +} + +func (s *testServer) readFromMonitor(t *testing.T, timeout time.Duration) cdpMessage { + t.Helper() + select { + case b := <-s.msgCh: + var msg cdpMessage + require.NoError(t, json.Unmarshal(b, &msg)) + return msg + case <-time.After(timeout): + t.Fatal("timeout waiting for message from Monitor") + return cdpMessage{} + } +} + +func (s *testServer) close() { + s.connMu.Lock() + if s.conn != nil { + _ = s.conn.Close(websocket.StatusNormalClosure, "done") + } + s.connMu.Unlock() + s.srv.Close() +} + +// testUpstream implements UpstreamProvider for tests. +type testUpstream struct { + mu sync.Mutex + current string + subs []chan string +} + +func newTestUpstream(url string) *testUpstream { + return &testUpstream{current: url} +} + +func (u *testUpstream) Current() string { + u.mu.Lock() + defer u.mu.Unlock() + return u.current +} + +func (u *testUpstream) Subscribe() (<-chan string, func()) { + ch := make(chan string, 1) + u.mu.Lock() + u.subs = append(u.subs, ch) + u.mu.Unlock() + cancel := func() { + u.mu.Lock() + for i, s := range u.subs { + if s == ch { + u.subs = append(u.subs[:i], u.subs[i+1:]...) + break + } + } + u.mu.Unlock() + close(ch) + } + return ch, cancel +} + +func (u *testUpstream) notifyRestart(newURL string) { + u.mu.Lock() + u.current = newURL + subs := make([]chan string, len(u.subs)) + copy(subs, u.subs) + u.mu.Unlock() + for _, ch := range subs { + select { + case ch <- newURL: + default: + } + } +} + +// eventCollector captures published events with channel-based notification. +type eventCollector struct { + mu sync.Mutex + events []events.Event + notify chan struct{} // signaled on every publish +} + +func newEventCollector() *eventCollector { + return &eventCollector{notify: make(chan struct{}, 256)} +} + +func (c *eventCollector) publishFn() PublishFunc { + return func(ev events.Event) { + c.mu.Lock() + c.events = append(c.events, ev) + c.mu.Unlock() + select { + case c.notify <- struct{}{}: + default: + } + } +} + +// waitFor blocks until an event of the given type is published, or fails. +func (c *eventCollector) waitFor(t *testing.T, eventType string, timeout time.Duration) events.Event { + t.Helper() + deadline := time.After(timeout) + for { + c.mu.Lock() + for _, ev := range c.events { + if ev.Type == eventType { + c.mu.Unlock() + return ev + } + } + c.mu.Unlock() + select { + case <-c.notify: + case <-deadline: + t.Fatalf("timeout waiting for event type=%q", eventType) + return events.Event{} + } + } +} + +// waitForNew blocks until a NEW event of the given type is published after this +// call, ignoring any events already in the collector. +func (c *eventCollector) waitForNew(t *testing.T, eventType string, timeout time.Duration) events.Event { + t.Helper() + c.mu.Lock() + skip := len(c.events) + c.mu.Unlock() + + deadline := time.After(timeout) + for { + c.mu.Lock() + for i := skip; i < len(c.events); i++ { + if c.events[i].Type == eventType { + ev := c.events[i] + c.mu.Unlock() + return ev + } + } + c.mu.Unlock() + select { + case <-c.notify: + case <-deadline: + t.Fatalf("timeout waiting for new event type=%q", eventType) + return events.Event{} + } + } +} + +// assertNone verifies that no event of the given type arrives within d. +func (c *eventCollector) assertNone(t *testing.T, eventType string, d time.Duration) { + t.Helper() + deadline := time.After(d) + for { + select { + case <-c.notify: + c.mu.Lock() + for _, ev := range c.events { + if ev.Type == eventType { + c.mu.Unlock() + t.Fatalf("unexpected event %q published", eventType) + return + } + } + c.mu.Unlock() + case <-deadline: + return + } + } +} + +// ResponderFunc is called for each CDP command the Monitor sends. +// Return nil to use the default empty result. +type ResponderFunc func(msg cdpMessage) any + +// listenAndRespond drains srv.msgCh, calls fn for each command, and sends the +// response. If fn is nil or returns nil, sends {"id": msg.ID, "result": {}}. +func listenAndRespond(srv *testServer, stopCh <-chan struct{}, fn ResponderFunc) { + for { + select { + case b := <-srv.msgCh: + var msg cdpMessage + if json.Unmarshal(b, &msg) != nil || msg.ID == nil { + continue + } + srv.connMu.Lock() + c := srv.conn + srv.connMu.Unlock() + if c == nil { + continue + } + var resp any + if fn != nil { + resp = fn(msg) + } + if resp == nil { + resp = map[string]any{"id": msg.ID, "result": map[string]any{}} + } + _ = wsjson.Write(context.Background(), c, resp) + case <-stopCh: + return + } + } +} + +// startMonitor creates a Monitor against srv, starts it, and returns a cleanup func. +// Waits for Target.getTargets (the last command in initSession) before returning. +func startMonitor(t *testing.T, srv *testServer, fn ResponderFunc) (*Monitor, *eventCollector, func()) { + t.Helper() + ec := newEventCollector() + upstream := newTestUpstream(srv.wsURL()) + m := New(upstream, ec.publishFn(), 99, discardLogger) + require.NoError(t, m.Start(context.Background())) + + // Closed when Target.getTargets is responded to (last command of initSession). + // Tests needing attachExistingTargets to finish should use require.Eventually. + initDone := make(chan struct{}) + var initOnce sync.Once + + wrappedFn := func(msg cdpMessage) any { + var result any + if fn != nil { + result = fn(msg) + } + if msg.Method == "Target.getTargets" { + initOnce.Do(func() { close(initDone) }) + } + return result + } + + stopResponder := make(chan struct{}) + go listenAndRespond(srv, stopResponder, wrappedFn) + + // Wait for the websocket connection to be established. + select { + case <-srv.connCh: + case <-time.After(3 * time.Second): + t.Fatal("fake server never received a connection") + } + // Wait for the init sequence to complete. + select { + case <-initDone: + case <-time.After(5 * time.Second): + t.Fatal("init sequence (Target.getTargets) did not complete") + } + + cleanup := func() { + close(stopResponder) + m.Stop() + } + return m, ec, cleanup +} + +// newComputedMonitor creates an unconnected Monitor for testing computed state +// (network_idle, layout_settled, navigation_settled) without a real websocket. +func newComputedMonitor(t *testing.T) (*Monitor, *eventCollector) { + t.Helper() + ec := newEventCollector() + upstream := newTestUpstream("ws://127.0.0.1:0") + m := New(upstream, ec.publishFn(), 0, discardLogger) + return m, ec +} + +// navigateMonitor sends a Page.frameNavigated to reset computed state. +// It ensures session "s1" has a page-like computedState before navigating, +// mirroring what handleAttachedToTarget would do in production. +func navigateMonitor(m *Monitor, url string) { + m.sessionsMu.Lock() + if _, ok := m.sessions["s1"]; !ok { + m.sessions["s1"] = targetInfo{targetID: "test-target", targetType: targetTypePage} + m.computedStates["s1"] = newComputedState(m.publish) + } + m.sessionsMu.Unlock() + m.handleFrameNavigated(cdpPageFrameNavigatedParams{ + Frame: cdpPageFrame{ID: "f1", URL: url}, + }, "s1") +} + +// simulateRequest sends a Network.requestWillBeSent through the handler. +func simulateRequest(m *Monitor, id string) { + m.handleNetworkRequest(cdpNetworkRequestWillBeSentParams{ + RequestID: id, + Type: "Document", + Request: cdpNetworkRequest{ + Method: "GET", + URL: "https://example.com/" + id, + }, + }, "s1") +} + +// simulateFinished stores minimal state and sends Network.loadingFinished. +func simulateFinished(m *Monitor, id string) { + m.pendReqMu.Lock() + m.pendingRequests[id] = networkReqState{sessionID: "s1", method: "GET", url: "https://example.com/" + id} + m.pendReqMu.Unlock() + m.handleLoadingFinished(context.Background(), cdpNetworkLoadingFinishedParams{RequestID: id}, "s1") +} diff --git a/server/lib/cdpmonitor/computed.go b/server/lib/cdpmonitor/computed.go new file mode 100644 index 00000000..fe7d11d3 --- /dev/null +++ b/server/lib/cdpmonitor/computed.go @@ -0,0 +1,325 @@ +package cdpmonitor + +import ( + "encoding/json" + "maps" + "sync" + "time" + + "github.com/kernel/kernel-images/server/lib/events" +) + +const ( + // networkIdleDebounce matches Playwright's networkidle heuristic: fire after + // 500 ms with no in-flight network requests. + networkIdleDebounce = 500 * time.Millisecond + // layoutSettledDebounce gives the page 1 s after the last layout shift (or + // page_load if no shifts occur) before declaring the layout stable. 1 s is + // chosen to cover typical late-loading web fonts and deferred image reflows. + layoutSettledDebounce = 1 * time.Second +) + +// computedState holds the mutable state for all computed meta-events. +type computedState struct { + mu sync.Mutex + publish PublishFunc + + // dead is set by stop(). Timer callbacks check it under mu and bail, + // preventing orphaned events after a session is detached or cleared. + dead bool + + // navSeq is incremented on every resetOnNavigation. AfterFunc callbacks + // capture their navSeq at creation and bail if it has changed, preventing + // stale timers from publishing events for a previous navigation. + navSeq int + + // navCtx is the navigation identity stamped at the last Page.frameNavigated. + // navData and navMeta are its precomputed JSON payload and Source.Metadata. + // Maps are replaced (not mutated) on each reset, so in-flight events holding + // a pointer to old navMeta are safe. + navCtx navContext + navData json.RawMessage + navMeta map[string]string + + // network_idle: 500 ms debounce after all pending requests finish. + netPending int + netTimer *time.Timer + netFired bool + + // layout_settled: 1s after page_load with no intervening layout shifts. + layoutTimer *time.Timer + layoutFired bool + pageLoadSeen bool + + // navigation_settled: fires once dom_content_loaded, network_idle, and + // layout_settled have all fired after the same Page.frameNavigated. + navDOMLoaded bool + navNetIdle bool + navLayoutSettled bool + navFired bool +} + +// newComputedState creates a fresh computedState backed by the given publish func. +// navData is initialized to {} and navMeta to an empty map so events emitted +// before the first frameNavigated carry consistent empty payloads rather than null. +func newComputedState(publish PublishFunc) *computedState { + return &computedState{ + publish: publish, + navData: json.RawMessage(`{}`), + navMeta: make(map[string]string), + } +} + +// navSnapshot returns the precomputed nav payload and metadata under mu. +func (s *computedState) navSnapshot() (json.RawMessage, map[string]string) { + s.mu.Lock() + defer s.mu.Unlock() + return s.navData, s.navMeta +} + +// navDataWith merges extra fields into the current nav payload. +// Nav context fields (session_id, target_id, etc.) always take precedence over +// caller-supplied extra so a page-controlled payload cannot forge nav identity. +func (s *computedState) navDataWith(extra map[string]any) json.RawMessage { + result := make(map[string]any) + maps.Copy(result, extra) + if s != nil { + d, _ := s.navSnapshot() + base := make(map[string]any) + _ = json.Unmarshal(d, &base) + maps.Copy(result, base) + } + out, _ := json.Marshal(result) + return out +} + +func stopTimer(t *time.Timer) { + if t == nil { + return + } + if !t.Stop() { + select { + case <-t.C: + default: + } + } +} + +// stop marks the state machine dead and cancels pending timers. Called when the +// owning session detaches or the monitor reconnects. Any AfterFunc goroutine +// already running will check dead under mu and discard its result. +func (s *computedState) stop() { + s.mu.Lock() + s.dead = true + stopTimer(s.netTimer) + stopTimer(s.layoutTimer) + s.mu.Unlock() +} + +// resetOnNavigation resets all state machines. Called on Page.frameNavigated. +// Increments navSeq so any AfterFunc callbacks already running will discard their results. +// inflight seeds netPending; callers pass 0 because each session only tracks its +// own requests and starts fresh on navigation. +func (s *computedState) resetOnNavigation(inflight int, ctx navContext) error { + s.mu.Lock() + defer s.mu.Unlock() + + s.navSeq++ + s.navCtx = ctx + navData, err := json.Marshal(map[string]any{ + "session_id": ctx.sessionID, + "target_id": ctx.targetID, + "target_type": ctx.targetType, + "frame_id": ctx.frameID, + "loader_id": ctx.loaderID, + "url": ctx.url, + "nav_seq": s.navSeq, + }) + if err != nil { + return err + } + s.navData = navData + s.navMeta = map[string]string{ + MetadataKeyCDPSessionID: ctx.sessionID, + MetadataKeyTargetID: ctx.targetID, + MetadataKeyTargetType: ctx.targetType, + } + + stopTimer(s.netTimer) + s.netTimer = nil + s.netPending = inflight + s.netFired = false + if inflight == 0 { + s.startNetIdleTimer() + } + + stopTimer(s.layoutTimer) + s.layoutTimer = nil + s.layoutFired = false + s.pageLoadSeen = false + + s.navDOMLoaded = false + s.navNetIdle = false + s.navLayoutSettled = false + s.navFired = false + return nil +} + +func (s *computedState) onRequest() { + s.mu.Lock() + defer s.mu.Unlock() + if s.dead { + return + } + s.netPending++ + // A new request invalidates any pending network_idle timer + stopTimer(s.netTimer) + s.netTimer = nil +} + +// onLoadingFinished is called on Network.loadingFinished or Network.loadingFailed. +func (s *computedState) onLoadingFinished() { + s.mu.Lock() + defer s.mu.Unlock() + if s.dead { + return + } + + s.netPending-- + if s.netPending < 0 { + // Clamping to zero: received a loadingFinished with no matching onRequest. + // This can happen if we attached mid-flight and missed the requestWillBeSent event. + s.netPending = 0 + } + if s.netPending > 0 || s.netFired { + return + } + // All requests done and not yet fired: start 500ms debounce timer. + s.startNetIdleTimer() +} + +// startNetIdleTimer arms the network_idle debounce timer. Must be called with s.mu held. +func (s *computedState) startNetIdleTimer() { + if s.dead { + return + } + stopTimer(s.netTimer) + navSeq := s.navSeq + navData := s.navData + navMeta := s.navMeta + s.netTimer = time.AfterFunc(networkIdleDebounce, func() { + s.mu.Lock() + if s.dead || s.navSeq != navSeq || s.netFired || s.netPending > 0 { + s.mu.Unlock() + return + } + s.netFired = true + s.navNetIdle = true + evs := []events.Event{{ + Ts: time.Now().UnixMicro(), + Type: EventNetworkIdle, + Category: events.CategoryNetwork, + Source: events.Source{ + Kind: events.KindCDP, + Metadata: navMeta, + }, + Data: navData, + }} + evs = append(evs, s.pendingNavigationSettled()...) + s.mu.Unlock() + for _, ev := range evs { + s.publish(ev) + } + }) +} + +// onPageLoad is called on Page.loadEventFired. +func (s *computedState) onPageLoad() { + s.mu.Lock() + defer s.mu.Unlock() + if s.dead { + return + } + s.pageLoadSeen = true + if s.layoutFired { + return + } + // Start the 1s layout_settled timer. + stopTimer(s.layoutTimer) + navSeq := s.navSeq + s.layoutTimer = time.AfterFunc(layoutSettledDebounce, func() { s.emitLayoutSettled(navSeq) }) +} + +// onLayoutShift is called when a layout_shift sentinel arrives from injected JS. +func (s *computedState) onLayoutShift() { + s.mu.Lock() + defer s.mu.Unlock() + if s.dead || s.layoutFired || !s.pageLoadSeen { + return + } + // Reset the timer to 1s from now. + stopTimer(s.layoutTimer) + navSeq := s.navSeq + s.layoutTimer = time.AfterFunc(layoutSettledDebounce, func() { s.emitLayoutSettled(navSeq) }) +} + +// emitLayoutSettled is called from the layout timer's AfterFunc goroutine. +func (s *computedState) emitLayoutSettled(navSeq int) { + s.mu.Lock() + if s.dead || s.navSeq != navSeq || s.layoutFired || !s.pageLoadSeen { + s.mu.Unlock() + return + } + s.layoutFired = true + s.navLayoutSettled = true + navData := s.navData + navMeta := s.navMeta + evs := []events.Event{{ + Ts: time.Now().UnixMicro(), + Type: EventLayoutSettled, + Category: events.CategoryPage, + Source: events.Source{ + Kind: events.KindCDP, + Metadata: navMeta, + }, + Data: navData, + }} + evs = append(evs, s.pendingNavigationSettled()...) + s.mu.Unlock() + for _, ev := range evs { + s.publish(ev) + } +} + +// onDOMContentLoaded is called on Page.domContentEventFired. +func (s *computedState) onDOMContentLoaded() { + s.mu.Lock() + s.navDOMLoaded = true + evs := s.pendingNavigationSettled() + s.mu.Unlock() + for _, ev := range evs { + s.publish(ev) + } +} + +// pendingNavigationSettled returns a navigation_settled event if all three +// conditions are met. Must be called with s.mu held. +func (s *computedState) pendingNavigationSettled() []events.Event { + if s.dead { + return nil + } + if s.navDOMLoaded && s.navNetIdle && s.navLayoutSettled && !s.navFired { + s.navFired = true + return []events.Event{{ + Ts: time.Now().UnixMicro(), + Type: EventNavigationSettled, + Category: events.CategoryPage, + Source: events.Source{ + Kind: events.KindCDP, + Metadata: s.navMeta, + }, + Data: s.navData, + }} + } + return nil +} diff --git a/server/lib/cdpmonitor/computed_test.go b/server/lib/cdpmonitor/computed_test.go new file mode 100644 index 00000000..700f58f4 --- /dev/null +++ b/server/lib/cdpmonitor/computed_test.go @@ -0,0 +1,191 @@ +package cdpmonitor + +import ( + "encoding/json" + "testing" + "time" + + "github.com/kernel/kernel-images/server/lib/events" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// newTestComputed creates a computedState with an eventCollector for testing. +func newTestComputed(t *testing.T) (*computedState, *eventCollector) { + t.Helper() + ec := newEventCollector() + cs := newComputedState(ec.publishFn()) + return cs, ec +} + +func TestNetworkIdle(t *testing.T) { + t.Run("debounce_500ms", func(t *testing.T) { + cs, ec := newTestComputed(t) + require.NoError(t, cs.resetOnNavigation(0, navContext{})) + + cs.onRequest() + cs.onRequest() + cs.onRequest() + + t0 := time.Now() + cs.onLoadingFinished() + cs.onLoadingFinished() + cs.onLoadingFinished() + + ev := ec.waitFor(t, "network_idle", 2*time.Second) + assert.GreaterOrEqual(t, time.Since(t0).Milliseconds(), int64(400), "fired too early") + assert.Equal(t, events.CategoryNetwork, ev.Category) + }) + + t.Run("timer_reset_on_new_request", func(t *testing.T) { + cs, ec := newTestComputed(t) + require.NoError(t, cs.resetOnNavigation(0, navContext{})) + + cs.onRequest() + cs.onLoadingFinished() + time.Sleep(200 * time.Millisecond) + + cs.onRequest() + t1 := time.Now() + cs.onLoadingFinished() + + ec.waitFor(t, "network_idle", 2*time.Second) + assert.GreaterOrEqual(t, time.Since(t1).Milliseconds(), int64(400), "should reset timer on new request") + }) +} + +func TestLayoutSettled(t *testing.T) { + t.Run("debounce_1s_after_page_load", func(t *testing.T) { + cs, ec := newTestComputed(t) + require.NoError(t, cs.resetOnNavigation(0, navContext{})) + + t0 := time.Now() + cs.onPageLoad() + + ev := ec.waitFor(t, "page_layout_settled", 3*time.Second) + assert.GreaterOrEqual(t, time.Since(t0).Milliseconds(), int64(900), "fired too early") + assert.Equal(t, events.CategoryPage, ev.Category) + }) + + t.Run("layout_shift_before_page_load_ignored", func(t *testing.T) { + cs, ec := newTestComputed(t) + require.NoError(t, cs.resetOnNavigation(0, navContext{})) + + // layout_shift before page_load should be ignored; layout_settled must + // still fire after page_load's 1s debounce. + cs.onLayoutShift() + t0 := time.Now() + cs.onPageLoad() + + ec.waitFor(t, "page_layout_settled", 3*time.Second) + assert.GreaterOrEqual(t, time.Since(t0).Milliseconds(), int64(900), "should fire 1s after page_load, not layout_shift") + }) + + t.Run("layout_shift_resets_timer", func(t *testing.T) { + cs, ec := newTestComputed(t) + require.NoError(t, cs.resetOnNavigation(0, navContext{})) + cs.onPageLoad() + + time.Sleep(600 * time.Millisecond) + cs.onLayoutShift() + t1 := time.Now() + + ec.waitFor(t, "page_layout_settled", 3*time.Second) + assert.GreaterOrEqual(t, time.Since(t1).Milliseconds(), int64(900), "should reset after layout_shift") + }) +} + +func TestNavigationSettled(t *testing.T) { + t.Run("fires_when_all_three_flags_set", func(t *testing.T) { + cs, ec := newTestComputed(t) + require.NoError(t, cs.resetOnNavigation(0, navContext{})) + + cs.onDOMContentLoaded() + cs.onRequest() + cs.onLoadingFinished() + cs.onPageLoad() + + ev := ec.waitFor(t, "page_navigation_settled", 3*time.Second) + assert.Equal(t, events.CategoryPage, ev.Category) + }) + + t.Run("interrupted_by_new_navigation", func(t *testing.T) { + cs, ec := newTestComputed(t) + require.NoError(t, cs.resetOnNavigation(0, navContext{})) + + cs.onDOMContentLoaded() + cs.onRequest() + cs.onLoadingFinished() + + // Interrupt before layout_settled fires. + require.NoError(t, cs.resetOnNavigation(0, navContext{})) + + ec.assertNone(t, "page_navigation_settled", 1500*time.Millisecond) + }) +} + +func TestNavDataMetadata(t *testing.T) { + ctx := navContext{ + sessionID: "s1", + targetID: "t1", + targetType: "page", + frameID: "f1", + loaderID: "l1", + url: "https://example.com", + } + + t.Run("layout_settled_carries_navData_and_navMeta", func(t *testing.T) { + cs, ec := newTestComputed(t) + require.NoError(t, cs.resetOnNavigation(0, ctx)) + cs.onPageLoad() + + ev := ec.waitFor(t, "page_layout_settled", 3*time.Second) + assert.Equal(t, events.CategoryPage, ev.Category) + assert.Equal(t, "s1", ev.Source.Metadata[MetadataKeyCDPSessionID]) + assert.Equal(t, "t1", ev.Source.Metadata[MetadataKeyTargetID]) + assert.Equal(t, "page", ev.Source.Metadata[MetadataKeyTargetType]) + var data map[string]any + require.NoError(t, json.Unmarshal(ev.Data, &data)) + assert.Equal(t, "s1", data["session_id"]) + assert.Equal(t, "l1", data["loader_id"]) + assert.Equal(t, "https://example.com", data["url"]) + }) + + t.Run("navigation_settled_carries_navData_and_navMeta", func(t *testing.T) { + cs, ec := newTestComputed(t) + require.NoError(t, cs.resetOnNavigation(0, ctx)) + + cs.onDOMContentLoaded() + cs.onRequest() + cs.onLoadingFinished() + cs.onPageLoad() + + ev := ec.waitFor(t, "page_navigation_settled", 3*time.Second) + assert.Equal(t, events.CategoryPage, ev.Category) + assert.Equal(t, "s1", ev.Source.Metadata[MetadataKeyCDPSessionID]) + assert.Equal(t, "t1", ev.Source.Metadata[MetadataKeyTargetID]) + var data map[string]any + require.NoError(t, json.Unmarshal(ev.Data, &data)) + assert.Equal(t, "s1", data["session_id"]) + assert.Equal(t, "l1", data["loader_id"]) + }) +} + +func TestStopSuppressesTimers(t *testing.T) { + t.Run("stop_suppresses_network_idle", func(t *testing.T) { + cs, ec := newTestComputed(t) + require.NoError(t, cs.resetOnNavigation(0, navContext{})) + cs.onRequest() + cs.onLoadingFinished() // arms 500ms network_idle timer + cs.stop() + ec.assertNone(t, "network_idle", 1200*time.Millisecond) + }) + + t.Run("stop_suppresses_layout_settled", func(t *testing.T) { + cs, ec := newTestComputed(t) + require.NoError(t, cs.resetOnNavigation(0, navContext{})) + cs.onPageLoad() // arms 1s layout_settled timer + cs.stop() + ec.assertNone(t, "page_layout_settled", 1500*time.Millisecond) + }) +} diff --git a/server/lib/cdpmonitor/domains.go b/server/lib/cdpmonitor/domains.go new file mode 100644 index 00000000..8502c59e --- /dev/null +++ b/server/lib/cdpmonitor/domains.go @@ -0,0 +1,66 @@ +package cdpmonitor + +import ( + "context" + _ "embed" +) + +// bindingName is the JS function exposed via Runtime.addBinding. +// Page JS calls this to fire Runtime.bindingCalled CDP events. +const bindingName = "__kernelEvent" + +// isPageLikeTarget reports whether the target type supports page-level CDP +// domains (Page.*, PerformanceTimeline.*, Page.addScriptToEvaluateOnNewDocument). +// Workers and service workers only support Runtime.* and Network.*. +func isPageLikeTarget(targetType string) bool { + return targetType == "page" || targetType == "iframe" +} + +// enableDomains enables CDP domains, registers the event binding, and starts +// layout-shift observation. Failures are non-fatal. +// Page-level domains (Page.enable, PerformanceTimeline.enable, Runtime.addBinding) +// are skipped for worker and service_worker targets that don't support them. +func (m *Monitor) enableDomains(ctx context.Context, sessionID string, targetType string) { + for _, method := range []string{ + "Runtime.enable", + "Network.enable", + } { + if _, err := m.send(ctx, method, nil, sessionID); err != nil && ctx.Err() == nil { + m.log.Warn("cdpmonitor: failed to enable CDP domain", "method", method, "session", sessionID, "err", err) + } + } + + if !isPageLikeTarget(targetType) { + return + } + + if _, err := m.send(ctx, "Page.enable", nil, sessionID); err != nil && ctx.Err() == nil { + m.log.Warn("cdpmonitor: failed to enable CDP domain", "method", "Page.enable", "session", sessionID, "err", err) + } + + if _, err := m.send(ctx, "Runtime.addBinding", map[string]any{ + "name": bindingName, + }, sessionID); err != nil && ctx.Err() == nil { + m.log.Warn("cdpmonitor: failed to register JS binding", "session", sessionID, "err", err) + } + + if _, err := m.send(ctx, "PerformanceTimeline.enable", map[string]any{ + "eventTypes": []string{timelineEventLayoutShift, timelineEventLCP}, + }, sessionID); err != nil && ctx.Err() == nil { + m.log.Warn("cdpmonitor: failed to enable PerformanceTimeline", "session", sessionID, "err", err) + } +} + +// injectedJS tracks clicks, keys, and scrolls via the __kernelEvent binding. +// Layout shifts are handled natively by PerformanceTimeline.enable. +// +//go:embed interaction.js +var injectedJS string + +// injectScript registers the interaction tracking JS for the given session. +func (m *Monitor) injectScript(ctx context.Context, sessionID string) error { + _, err := m.send(ctx, "Page.addScriptToEvaluateOnNewDocument", map[string]any{ + "source": injectedJS, + }, sessionID) + return err +} diff --git a/server/lib/cdpmonitor/handlers.go b/server/lib/cdpmonitor/handlers.go new file mode 100644 index 00000000..b1c80a6f --- /dev/null +++ b/server/lib/cdpmonitor/handlers.go @@ -0,0 +1,557 @@ +package cdpmonitor + +import ( + "context" + "encoding/base64" + "encoding/json" + "time" + + "github.com/kernel/kernel-images/server/lib/events" +) + +// logUnmarshalErr logs a Debug message when a handler can't parse CDP params. +// These indicate Chrome sent an unexpected params shape, rare and non-actionable +// at Warn/Error level, but useful in verbose mode. +func (m *Monitor) logUnmarshalErr(method string, err error) { + m.log.Debug("cdpmonitor: failed to parse CDP params", "method", method, "err", err) +} + +// publishEvent stamps common fields and publishes an event. +func (m *Monitor) publishEvent(eventType string, category events.EventCategory, source events.Source, sourceEvent string, data json.RawMessage, sessionID string) { + src := source + src.Event = sourceEvent + if sessionID != "" { + if src.Metadata == nil { + src.Metadata = make(map[string]string) + } + src.Metadata[MetadataKeyCDPSessionID] = sessionID + m.sessionsMu.RLock() + info := m.sessions[sessionID] + m.sessionsMu.RUnlock() + src.Metadata[MetadataKeyTargetID] = info.targetID + src.Metadata[MetadataKeyTargetType] = info.targetType + } + m.publish(events.Event{ + Ts: time.Now().UnixMicro(), + Type: eventType, + Category: category, + Source: src, + Data: data, + }) +} + +// decodeParams unmarshals msg.Params into dst, logging on failure. +// Returns true on success so dispatch can gate the handler call. +func (m *Monitor) decodeParams(method string, params json.RawMessage, dst any) bool { + if err := json.Unmarshal(params, dst); err != nil { + m.logUnmarshalErr(method, err) + return false + } + return true +} + +// dispatchEvent routes a CDP event to its handler. +func (m *Monitor) dispatchEvent(msg cdpMessage) { + m.lifeMu.Lock() + ctx := m.lifecycleCtx + m.lifeMu.Unlock() + + switch msg.Method { + case "Runtime.consoleAPICalled": + var p cdpRuntimeConsoleAPICalledParams + if m.decodeParams(msg.Method, msg.Params, &p) { + m.handleConsole(p, msg.SessionID) + } + case "Runtime.exceptionThrown": + var p cdpRuntimeExceptionThrownParams + if m.decodeParams(msg.Method, msg.Params, &p) { + m.handleExceptionThrown(ctx, p, msg.SessionID) + } + case "Runtime.bindingCalled": + var p cdpRuntimeBindingCalledParams + if m.decodeParams(msg.Method, msg.Params, &p) { + m.handleBindingCalled(p, msg.SessionID) + } + case "Network.requestWillBeSent": + var p cdpNetworkRequestWillBeSentParams + if m.decodeParams(msg.Method, msg.Params, &p) { + m.handleNetworkRequest(p, msg.SessionID) + } + case "Network.responseReceived": + var p cdpNetworkResponseReceivedParams + if m.decodeParams(msg.Method, msg.Params, &p) { + m.handleResponseReceived(p, msg.SessionID) + } + case "Network.loadingFinished": + var p cdpNetworkLoadingFinishedParams + if m.decodeParams(msg.Method, msg.Params, &p) { + m.handleLoadingFinished(ctx, p, msg.SessionID) + } + case "Network.loadingFailed": + var p cdpNetworkLoadingFailedParams + if m.decodeParams(msg.Method, msg.Params, &p) { + m.handleLoadingFailed(p, msg.SessionID) + } + case "Page.frameNavigated": + var p cdpPageFrameNavigatedParams + if m.decodeParams(msg.Method, msg.Params, &p) { + m.handleFrameNavigated(p, msg.SessionID) + } + case "Page.domContentEventFired": + var p cdpPageDomContentEventFiredParams + if m.decodeParams(msg.Method, msg.Params, &p) { + m.handleDOMContentLoaded(p, msg.SessionID) + } + case "Page.loadEventFired": + var p cdpPageLoadEventFiredParams + if m.decodeParams(msg.Method, msg.Params, &p) { + m.handleLoadEventFired(ctx, p, msg.SessionID) + } + case "PerformanceTimeline.timelineEventAdded": + var p cdpPerformanceTimelineEventAddedParams + if m.decodeParams(msg.Method, msg.Params, &p) { + m.handleTimelineEvent(p, msg.SessionID) + } + case "Target.attachedToTarget": + var p cdpTargetAttachedToTargetParams + if m.decodeParams(msg.Method, msg.Params, &p) { + m.handleAttachedToTarget(ctx, p) + } + case "Target.detachedFromTarget": + var p cdpTargetDetachedFromTargetParams + if m.decodeParams(msg.Method, msg.Params, &p) { + m.handleDetachedFromTarget(p) + } + } +} + +func (m *Monitor) handleConsole(p cdpRuntimeConsoleAPICalledParams, sessionID string) { + text := "" + if len(p.Args) > 0 { + text = consoleArgString(p.Args[0]) + } + argValues := make([]string, 0, len(p.Args)) + for _, a := range p.Args { + argValues = append(argValues, consoleArgString(a)) + } + eventType := EventConsoleLog + if p.Type == "error" { + eventType = EventConsoleError + } + cs := m.computedFor(sessionID) + data := cs.navDataWith(map[string]any{ + "level": p.Type, + "text": text, + "args": argValues, + "stack_trace": p.StackTrace, + }) + m.publishEvent(eventType, events.CategoryConsole, events.Source{Kind: events.KindCDP}, "Runtime.consoleAPICalled", data, sessionID) +} + +func (m *Monitor) handleExceptionThrown(ctx context.Context, p cdpRuntimeExceptionThrownParams, sessionID string) { + cs := m.computedFor(sessionID) + // source_url is the script file URL; distinct from nav context's url (the page URL). + data := cs.navDataWith(map[string]any{ + "text": p.ExceptionDetails.Text, + "line": p.ExceptionDetails.LineNumber, + "column": p.ExceptionDetails.ColumnNumber, + "source_url": p.ExceptionDetails.URL, + "stack_trace": p.ExceptionDetails.StackTrace, + }) + m.publishEvent(EventConsoleError, events.CategoryConsole, events.Source{Kind: events.KindCDP}, "Runtime.exceptionThrown", data, sessionID) + m.tryScreenshot(ctx, "Runtime.exceptionThrown", sessionID) +} + +// bindingMinInterval is the minimum time between accepted __kernelEvent binding +// calls per session. This caps throughput at 20 events/s per session, preventing +// a misbehaving page from flooding the event pipeline. +const bindingMinInterval = 50 * time.Millisecond + +// handleBindingCalled processes __kernelEvent binding calls from the page. +func (m *Monitor) handleBindingCalled(p cdpRuntimeBindingCalledParams, sessionID string) { + if p.Name != bindingName { + return + } + + payload := json.RawMessage(p.Payload) + if !json.Valid(payload) { + return + } + var header struct { + Type string `json:"type"` + } + if err := json.Unmarshal(payload, &header); err != nil { + return + } + switch header.Type { + case EventInteractionClick, EventInteractionKey, EventScrollSettled: + default: + return + } + + // Rate-limit per (session, event type): cap at 20 events/s per pair so a + // misbehaving page cannot flood the event pipeline with a single event type. + now := time.Now() + rateKey := sessionID + ":" + header.Type + m.bindingRateMu.Lock() + last := m.bindingLastSeen[rateKey] + if now.Sub(last) < bindingMinInterval { + m.bindingRateMu.Unlock() + return + } + m.bindingLastSeen[rateKey] = now + m.bindingRateMu.Unlock() + + var payloadMap map[string]any + _ = json.Unmarshal(payload, &payloadMap) + cs := m.computedFor(sessionID) + m.publishEvent(header.Type, events.CategoryInteraction, events.Source{Kind: events.KindCDP}, "Runtime.bindingCalled", cs.navDataWith(payloadMap), sessionID) +} + +// handleTimelineEvent processes PerformanceTimeline layout-shift and LCP events. +func (m *Monitor) handleTimelineEvent(p cdpPerformanceTimelineEventAddedParams, sessionID string) { + switch p.Event.Type { + case timelineEventLayoutShift: + // source_frame_id is the frame where the shift occurred; distinct from nav + // context's frame_id (the top-level navigated frame). + ev := map[string]any{ + "source_frame_id": p.Event.FrameID, + "time": p.Event.Time, + "duration": p.Event.Duration, + } + var shift cdpLayoutShiftDetails + if p.Event.LayoutShiftDetails != nil && json.Unmarshal(p.Event.LayoutShiftDetails, &shift) == nil { + ev["layout_shift_details"] = map[string]any{ + "score": shift.Score, + "had_recent_input": shift.HadRecentInput, + } + } + cs := m.computedFor(sessionID) + data := cs.navDataWith(ev) + m.publishEvent(EventLayoutShift, events.CategoryPage, events.Source{Kind: events.KindCDP}, "PerformanceTimeline.timelineEventAdded", data, sessionID) + if cs != nil { + cs.onLayoutShift() + } + + case timelineEventLCP: + ev := map[string]any{ + "source_frame_id": p.Event.FrameID, + "time": p.Event.Time, + } + var lcp cdpLcpDetails + if p.Event.LcpDetails != nil && json.Unmarshal(p.Event.LcpDetails, &lcp) == nil { + ev["lcp_details"] = map[string]any{ + "render_time": lcp.RenderTime, + "load_time": lcp.LoadTime, + "size": lcp.Size, + "element_id": lcp.ElementID, + "url": lcp.URL, + "node_id": lcp.NodeID, + } + } + cs := m.computedFor(sessionID) + data := cs.navDataWith(ev) + m.publishEvent(EventLCP, events.CategoryPage, events.Source{Kind: events.KindCDP}, "PerformanceTimeline.timelineEventAdded", data, sessionID) + } +} + +// handleNetworkRequest publishes network_request events. +// NOTE: events include raw headers, post_data, and (on response) truncated +// bodies which may contain cookies, bearer tokens, or other credentials. +// This mirrors what CDP/DevTools itself exposes. Consumers should treat the +// event stream as privileged data; opt-in redaction can be added later. +func (m *Monitor) handleNetworkRequest(p cdpNetworkRequestWillBeSentParams, sessionID string) { + // Extract only the initiator type; the stack trace is too verbose and dominates event size. + var initiatorType string + var raw struct { + Type string `json:"type"` + } + if json.Unmarshal(p.Initiator, &raw) == nil { + initiatorType = raw.Type + } + + // Redirects reuse the same requestId and fire additional requestWillBeSent + // events, but only a single loadingFinished fires per chain. Only increment + // netPending for genuinely new requests to avoid permanently inflating the + // counter and blocking network_idle. + m.pendReqMu.Lock() + existing, isRedirect := m.pendingRequests[p.RequestID] + addedAt := existing.addedAt + if !isRedirect { + addedAt = time.Now() + } + m.pendingRequests[p.RequestID] = networkReqState{ + sessionID: sessionID, + method: p.Request.Method, + url: p.Request.URL, + headers: p.Request.Headers, + postData: p.Request.PostData, + resourceType: p.Type, + loaderID: p.LoaderID, + frameID: p.FrameID, + addedAt: addedAt, + } + m.pendReqMu.Unlock() + ev := map[string]any{ + "request_id": p.RequestID, + "loader_id": p.LoaderID, + "frame_id": p.FrameID, + "document_url": p.DocumentURL, + "method": p.Request.Method, + "url": p.Request.URL, + "headers": p.Request.Headers, + "initiator_type": initiatorType, + } + if p.Request.PostData != "" { + ev["post_data"] = p.Request.PostData + } + if p.Type != "" { + ev["resource_type"] = p.Type + } + if isRedirect { + ev["is_redirect"] = true + ev["redirect_url"] = existing.url + } + data, _ := json.Marshal(ev) + m.publishEvent(EventNetworkRequest, events.CategoryNetwork, events.Source{Kind: events.KindCDP}, "Network.requestWillBeSent", data, sessionID) + if !isRedirect { + if cs := m.computedFor(sessionID); cs != nil { + cs.onRequest() + } + } +} + +func (m *Monitor) handleResponseReceived(p cdpNetworkResponseReceivedParams, _ string) { + m.pendReqMu.Lock() + if state, ok := m.pendingRequests[p.RequestID]; ok { + state.status = p.Response.Status + state.statusText = p.Response.StatusText + state.resHeaders = p.Response.Headers + state.mimeType = p.Response.MimeType + m.pendingRequests[p.RequestID] = state + } + m.pendReqMu.Unlock() +} + +func (m *Monitor) handleLoadingFinished(ctx context.Context, p cdpNetworkLoadingFinishedParams, sessionID string) { + m.pendReqMu.Lock() + state, ok := m.pendingRequests[p.RequestID] + if ok { + delete(m.pendingRequests, p.RequestID) + } + m.pendReqMu.Unlock() + if !ok { + return + } + if cs := m.computedFor(state.sessionID); cs != nil { + cs.onLoadingFinished() + } + // Fetch response body async to avoid blocking readLoop; binary types are skipped. + m.asyncWg.Go(func() { + body := m.fetchResponseBody(ctx, p.RequestID, sessionID, state) + ev := map[string]any{ + "request_id": p.RequestID, + "loader_id": state.loaderID, + "frame_id": state.frameID, + "method": state.method, + "url": state.url, + "status": state.status, + "headers": state.resHeaders, + } + if state.statusText != "" { + ev["status_text"] = state.statusText + } + if state.mimeType != "" { + ev["mime_type"] = state.mimeType + } + if state.resourceType != "" { + ev["resource_type"] = state.resourceType + } + if body != "" { + ev["body"] = body + } + data, _ := json.Marshal(ev) + m.publishEvent(EventNetworkResponse, events.CategoryNetwork, events.Source{Kind: events.KindCDP}, "Network.loadingFinished", data, sessionID) + }) +} + +// fetchResponseBody retrieves and truncates the response body for textual resources. +func (m *Monitor) fetchResponseBody(ctx context.Context, requestID, sessionID string, state networkReqState) string { + if !isTextualResource(state.resourceType, state.mimeType) { + return "" + } + result, err := m.send(ctx, "Network.getResponseBody", map[string]any{ + "requestId": requestID, + }, sessionID) + if err != nil { + return "" + } + var resp struct { + Body string `json:"body"` + Base64Encoded bool `json:"base64Encoded"` + } + if json.Unmarshal(result, &resp) != nil { + return "" + } + body := resp.Body + if resp.Base64Encoded { + decoded, err := base64.StdEncoding.DecodeString(body) + if err != nil { + return "" + } + body = string(decoded) + } + return truncateBody(body, bodyCapFor(state.mimeType)) +} + +func (m *Monitor) handleLoadingFailed(p cdpNetworkLoadingFailedParams, sessionID string) { + m.pendReqMu.Lock() + state, ok := m.pendingRequests[p.RequestID] + if ok { + delete(m.pendingRequests, p.RequestID) + } + m.pendReqMu.Unlock() + + ev := map[string]any{ + "request_id": p.RequestID, + "error_text": p.ErrorText, + "canceled": p.Canceled, + } + if ok { + ev["url"] = state.url + ev["loader_id"] = state.loaderID + ev["frame_id"] = state.frameID + ev["resource_type"] = state.resourceType + } + data, _ := json.Marshal(ev) + m.publishEvent(EventNetworkLoadingFailed, events.CategoryNetwork, events.Source{Kind: events.KindCDP}, "Network.loadingFailed", data, sessionID) + if ok { + if cs := m.computedFor(state.sessionID); cs != nil { + cs.onLoadingFinished() + } + } +} + +func (m *Monitor) handleFrameNavigated(p cdpPageFrameNavigatedParams, sessionID string) { + // Pre-fetch target info and computedState before acquiring pendReqMu to + // avoid a pendReqMu → sessionsMu ordering cycle. + m.sessionsMu.RLock() + info := m.sessions[sessionID] + cs := m.computedStates[sessionID] + m.sessionsMu.RUnlock() + + data, _ := json.Marshal(map[string]any{ + "session_id": sessionID, + "target_id": info.targetID, + "target_type": info.targetType, + "url": p.Frame.URL, + "frame_id": p.Frame.ID, + "parent_frame_id": p.Frame.ParentID, + "loader_id": p.Frame.LoaderID, + }) + m.publishEvent(EventNavigation, events.CategoryPage, events.Source{Kind: events.KindCDP}, "Page.frameNavigated", data, sessionID) + + // Only reset state for top-level navigations; subframe (iframe) navigations + // should not disrupt main-page tracking. + if p.Frame.ParentID == "" { + m.mainSessionID.Store(sessionID) + + navCtx := navContext{ + sessionID: sessionID, + targetID: info.targetID, + targetType: info.targetType, + frameID: p.Frame.ID, + loaderID: p.Frame.LoaderID, + url: p.Frame.URL, + } + + m.pendReqMu.Lock() + for id, req := range m.pendingRequests { + if req.sessionID == sessionID { + delete(m.pendingRequests, id) + } + } + // Reset while holding pendReqMu so new requests arriving concurrently + // can't increment netPending before the reset completes. + // inflight=0: remaining pendingRequests belong to other target sessions; + // their loadingFinished events decrement those sessions' own state machines, + // not this one, so we start fresh. + if cs != nil { + if err := cs.resetOnNavigation(0, navCtx); err != nil { + m.log.Error("cdpmonitor: failed to build nav event payload", "err", err) + } + } + m.pendReqMu.Unlock() + } +} + +func (m *Monitor) handleDOMContentLoaded(p cdpPageDomContentEventFiredParams, sessionID string) { + cs := m.computedFor(sessionID) + data := cs.navDataWith(map[string]any{"cdp_timestamp": p.Timestamp}) + m.publishEvent(EventDOMContentLoaded, events.CategoryPage, events.Source{Kind: events.KindCDP}, "Page.domContentEventFired", data, sessionID) + if cs != nil { + cs.onDOMContentLoaded() + } +} + +func (m *Monitor) handleLoadEventFired(ctx context.Context, p cdpPageLoadEventFiredParams, sessionID string) { + cs := m.computedFor(sessionID) + data := cs.navDataWith(map[string]any{"cdp_timestamp": p.Timestamp}) + m.publishEvent(EventPageLoad, events.CategoryPage, events.Source{Kind: events.KindCDP}, "Page.loadEventFired", data, sessionID) + if cs != nil { + cs.onPageLoad() + } + if m.mainSessionID.Load() == sessionID { + m.tryScreenshot(ctx, "Page.loadEventFired", sessionID) + } +} + +// handleAttachedToTarget stores the new session then enables domains and injects script. +// The outer message sessionID (root session) is unused; the child session we +// attached to is in p.SessionID. +func (m *Monitor) handleAttachedToTarget(ctx context.Context, p cdpTargetAttachedToTargetParams) { + m.sessionsMu.Lock() + m.sessions[p.SessionID] = targetInfo{ + targetID: p.TargetInfo.TargetID, + url: p.TargetInfo.URL, + targetType: p.TargetInfo.Type, + } + if p.TargetInfo.Type == targetTypePage { + m.computedStates[p.SessionID] = newComputedState(m.publish) + } + m.sessionsMu.Unlock() + + if p.TargetInfo.Type == targetTypePage { + data, _ := json.Marshal(map[string]any{ + "target_id": p.TargetInfo.TargetID, + "target_type": p.TargetInfo.Type, + "url": p.TargetInfo.URL, + "opener_id": p.TargetInfo.OpenerID, + "title": p.TargetInfo.Title, + }) + m.publishEvent(EventTabOpened, events.CategoryPage, events.Source{Kind: events.KindCDP}, "Target.attachedToTarget", data, p.SessionID) + } + + targetType := p.TargetInfo.Type + // Async to avoid blocking the readLoop. + m.asyncWg.Go(func() { + m.enableDomains(ctx, p.SessionID, targetType) + if isPageLikeTarget(targetType) { + _ = m.injectScript(ctx, p.SessionID) + } + }) +} + +func (m *Monitor) handleDetachedFromTarget(p cdpTargetDetachedFromTargetParams) { + if p.SessionID == "" { + return + } + m.sessionsMu.Lock() + cs := m.computedStates[p.SessionID] + delete(m.sessions, p.SessionID) + delete(m.computedStates, p.SessionID) + m.sessionsMu.Unlock() + if cs != nil { + cs.stop() + } +} diff --git a/server/lib/cdpmonitor/handlers_test.go b/server/lib/cdpmonitor/handlers_test.go new file mode 100644 index 00000000..c7704099 --- /dev/null +++ b/server/lib/cdpmonitor/handlers_test.go @@ -0,0 +1,550 @@ +package cdpmonitor + +import ( + "encoding/json" + "sync/atomic" + "testing" + "time" + + "github.com/kernel/kernel-images/server/lib/events" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestConsoleEvents(t *testing.T) { + srv := newTestServer(t) + defer srv.close() + + _, ec, cleanup := startMonitor(t, srv, nil) + defer cleanup() + + t.Run("console_log", func(t *testing.T) { + srv.sendToMonitor(t, map[string]any{ + "method": "Runtime.consoleAPICalled", + "params": map[string]any{ + "type": "log", + "args": []any{map[string]any{"type": "string", "value": "hello world"}}, + }, + }) + ev := ec.waitFor(t, "console_log", 2*time.Second) + assert.Equal(t, events.CategoryConsole, ev.Category) + assert.Equal(t, events.KindCDP, ev.Source.Kind) + assert.Equal(t, "Runtime.consoleAPICalled", ev.Source.Event) + var data map[string]any + require.NoError(t, json.Unmarshal(ev.Data, &data)) + assert.Equal(t, "log", data["level"]) + assert.Equal(t, "hello world", data["text"]) + }) + + t.Run("exception_thrown", func(t *testing.T) { + srv.sendToMonitor(t, map[string]any{ + "method": "Runtime.exceptionThrown", + "params": map[string]any{ + "timestamp": 1234.5, + "exceptionDetails": map[string]any{ + "text": "Uncaught TypeError", + "lineNumber": 42, + "columnNumber": 7, + "url": "https://example.com/app.js", + }, + }, + }) + ev := ec.waitFor(t, "console_error", 2*time.Second) + assert.Equal(t, events.CategoryConsole, ev.Category) + var data map[string]any + require.NoError(t, json.Unmarshal(ev.Data, &data)) + assert.Equal(t, "Uncaught TypeError", data["text"]) + assert.Equal(t, float64(42), data["line"]) + }) + + t.Run("non_string_args", func(t *testing.T) { + srv.sendToMonitor(t, map[string]any{ + "method": "Runtime.consoleAPICalled", + "params": map[string]any{ + "type": "log", + "args": []any{ + map[string]any{"type": "number", "value": 42}, + map[string]any{"type": "object", "value": map[string]any{"key": "val"}}, + map[string]any{"type": "undefined"}, + }, + }, + }) + ev := ec.waitForNew(t, "console_log", 2*time.Second) + var data map[string]any + require.NoError(t, json.Unmarshal(ev.Data, &data)) + args := data["args"].([]any) + assert.Equal(t, "42", args[0]) + assert.Contains(t, args[1], "key") + assert.Equal(t, "undefined", args[2]) + }) +} + +func TestNetworkEvents(t *testing.T) { + srv := newTestServer(t) + defer srv.close() + + var getBodyCalled atomic.Bool + responder := func(msg cdpMessage) any { + if msg.Method == "Network.getResponseBody" { + getBodyCalled.Store(true) + return map[string]any{ + "id": msg.ID, + "result": map[string]any{"body": `{"ok":true}`, "base64Encoded": false}, + } + } + return nil + } + _, ec, cleanup := startMonitor(t, srv, responder) + defer cleanup() + + t.Run("request_and_response", func(t *testing.T) { + srv.sendToMonitor(t, map[string]any{ + "method": "Network.requestWillBeSent", + "params": map[string]any{ + "requestId": "req-001", + "type": "XHR", + "request": map[string]any{ + "method": "POST", + "url": "https://api.example.com/data", + "headers": map[string]any{"Content-Type": "application/json"}, + }, + "initiator": map[string]any{"type": "script"}, + }, + }) + ev := ec.waitFor(t, "network_request", 2*time.Second) + assert.Equal(t, events.CategoryNetwork, ev.Category) + assert.Equal(t, "Network.requestWillBeSent", ev.Source.Event) + + var data map[string]any + require.NoError(t, json.Unmarshal(ev.Data, &data)) + assert.Equal(t, "POST", data["method"]) + assert.Equal(t, "https://api.example.com/data", data["url"]) + assert.Equal(t, "XHR", data["resource_type"], "resource_type must be populated from PDL 'type' wire field") + + srv.sendToMonitor(t, map[string]any{ + "method": "Network.responseReceived", + "params": map[string]any{ + "requestId": "req-001", + "response": map[string]any{ + "status": 200, "statusText": "OK", + "headers": map[string]any{"Content-Type": "application/json"}, "mimeType": "application/json", + }, + }, + }) + srv.sendToMonitor(t, map[string]any{ + "method": "Network.loadingFinished", + "params": map[string]any{"requestId": "req-001"}, + }) + + ev2 := ec.waitFor(t, "network_response", 3*time.Second) + assert.Equal(t, "Network.loadingFinished", ev2.Source.Event) + var data2 map[string]any + require.NoError(t, json.Unmarshal(ev2.Data, &data2)) + assert.Equal(t, float64(200), data2["status"]) + assert.NotEmpty(t, data2["body"]) + }) + + t.Run("loading_failed", func(t *testing.T) { + srv.sendToMonitor(t, map[string]any{ + "method": "Network.requestWillBeSent", + "params": map[string]any{ + "requestId": "req-002", + "request": map[string]any{"method": "GET", "url": "https://fail.example.com/"}, + }, + }) + ec.waitForNew(t, "network_request", 2*time.Second) + + srv.sendToMonitor(t, map[string]any{ + "method": "Network.loadingFailed", + "params": map[string]any{ + "requestId": "req-002", + "errorText": "net::ERR_CONNECTION_REFUSED", + "canceled": false, + }, + }) + ev := ec.waitFor(t, "network_loading_failed", 2*time.Second) + assert.Equal(t, events.CategoryNetwork, ev.Category) + var data map[string]any + require.NoError(t, json.Unmarshal(ev.Data, &data)) + assert.Equal(t, "net::ERR_CONNECTION_REFUSED", data["error_text"]) + }) + + t.Run("binary_resource_skips_body", func(t *testing.T) { + getBodyCalled.Store(false) + // Use PDL wire key "type" (not "resourceType") — Chrome emits ResourceType + // under "type" for Network.requestWillBeSent. + srv.sendToMonitor(t, map[string]any{ + "method": "Network.requestWillBeSent", + "params": map[string]any{ + "requestId": "img-001", + "type": "Image", + "request": map[string]any{"method": "GET", "url": "https://example.com/photo.png"}, + }, + }) + srv.sendToMonitor(t, map[string]any{ + "method": "Network.responseReceived", + "params": map[string]any{ + "requestId": "img-001", + "response": map[string]any{"status": 200, "statusText": "OK", "headers": map[string]any{}, "mimeType": "image/png"}, + }, + }) + srv.sendToMonitor(t, map[string]any{ + "method": "Network.loadingFinished", + "params": map[string]any{"requestId": "img-001"}, + }) + + ev := ec.waitForNew(t, "network_response", 3*time.Second) + var data map[string]any + require.NoError(t, json.Unmarshal(ev.Data, &data)) + assert.Nil(t, data["body"], "binary resource should not have body field") + assert.False(t, getBodyCalled.Load(), "should not call getResponseBody for images") + }) +} + +func TestPageEvents(t *testing.T) { + srv := newTestServer(t) + defer srv.close() + + _, ec, cleanup := startMonitor(t, srv, nil) + defer cleanup() + + // Attach a page target first so computedState exists for nav context. + srv.sendToMonitor(t, map[string]any{ + "method": "Target.attachedToTarget", + "params": map[string]any{ + "sessionId": "sess-page", + "targetInfo": map[string]any{ + "targetId": "target-page", "type": "page", + "url": "about:blank", "attached": true, + }, + "waitingForDebugger": false, + }, + }) + ec.waitFor(t, "page_tab_opened", 2*time.Second) + + srv.sendToMonitor(t, map[string]any{ + "method": "Page.frameNavigated", "sessionId": "sess-page", + "params": map[string]any{ + "frame": map[string]any{ + "id": "frame-1", "url": "https://example.com/page", + "loaderId": "loader-1", + }, + }, + }) + ev := ec.waitFor(t, "page_navigation", 2*time.Second) + assert.Equal(t, events.CategoryPage, ev.Category) + assert.Equal(t, "Page.frameNavigated", ev.Source.Event) + var data map[string]any + require.NoError(t, json.Unmarshal(ev.Data, &data)) + assert.Equal(t, "https://example.com/page", data["url"]) + + srv.sendToMonitor(t, map[string]any{ + "method": "Page.domContentEventFired", "sessionId": "sess-page", + "params": map[string]any{"timestamp": 1000.0}, + }) + ev2 := ec.waitFor(t, "page_dom_content_loaded", 2*time.Second) + assert.Equal(t, events.CategoryPage, ev2.Category) + var data2 map[string]any + require.NoError(t, json.Unmarshal(ev2.Data, &data2)) + assert.Equal(t, float64(1000.0), data2["cdp_timestamp"]) + assert.Equal(t, "loader-1", data2["loader_id"]) + assert.Equal(t, "https://example.com/page", data2["url"]) + + srv.sendToMonitor(t, map[string]any{ + "method": "Page.loadEventFired", "sessionId": "sess-page", + "params": map[string]any{"timestamp": 1001.0}, + }) + ev3 := ec.waitFor(t, "page_load", 2*time.Second) + assert.Equal(t, events.CategoryPage, ev3.Category) + var data3 map[string]any + require.NoError(t, json.Unmarshal(ev3.Data, &data3)) + assert.Equal(t, float64(1001.0), data3["cdp_timestamp"]) + assert.Equal(t, "loader-1", data3["loader_id"]) +} + +func TestTabOpened(t *testing.T) { + srv := newTestServer(t) + defer srv.close() + + _, ec, cleanup := startMonitor(t, srv, nil) + defer cleanup() + + t.Run("page_target_emits_tab_opened", func(t *testing.T) { + srv.sendToMonitor(t, map[string]any{ + "method": "Target.attachedToTarget", + "params": map[string]any{ + "sessionId": "sess-tab", + "targetInfo": map[string]any{ + "targetId": "target-tab", "type": "page", + "url": "https://example.com", "attached": true, + "title": "Example", + }, + "waitingForDebugger": false, + }, + }) + ev := ec.waitFor(t, "page_tab_opened", 2*time.Second) + assert.Equal(t, events.CategoryPage, ev.Category) + assert.Equal(t, "Target.attachedToTarget", ev.Source.Event) + var data map[string]any + require.NoError(t, json.Unmarshal(ev.Data, &data)) + assert.Equal(t, "target-tab", data["target_id"]) + assert.Equal(t, "page", data["target_type"]) + assert.Equal(t, "https://example.com", data["url"]) + assert.Equal(t, "Example", data["title"]) + }) + + t.Run("iframe_target_no_tab_opened", func(t *testing.T) { + srv.sendToMonitor(t, map[string]any{ + "method": "Target.attachedToTarget", + "params": map[string]any{ + "sessionId": "sess-iframe", + "targetInfo": map[string]any{ + "targetId": "target-iframe", "type": "iframe", + "url": "https://iframe.example.com", "attached": true, + }, + "waitingForDebugger": false, + }, + }) + ec.assertNone(t, "page_tab_opened", 200*time.Millisecond) + }) +} + +func TestBindingAndTimeline(t *testing.T) { + srv := newTestServer(t) + defer srv.close() + + _, ec, cleanup := startMonitor(t, srv, nil) + defer cleanup() + + t.Run("interaction_click", func(t *testing.T) { + srv.sendToMonitor(t, map[string]any{ + "method": "Runtime.bindingCalled", + "params": map[string]any{ + "name": "__kernelEvent", + "payload": `{"type":"interaction_click","x":10,"y":20,"selector":"button","tag":"BUTTON","text":"OK"}`, + }, + }) + ev := ec.waitFor(t, "interaction_click", 2*time.Second) + assert.Equal(t, events.CategoryInteraction, ev.Category) + assert.Equal(t, "Runtime.bindingCalled", ev.Source.Event) + }) + + t.Run("interaction_scroll_settled", func(t *testing.T) { + srv.sendToMonitor(t, map[string]any{ + "method": "Runtime.bindingCalled", + "params": map[string]any{ + "name": "__kernelEvent", + "payload": `{"type":"interaction_scroll_settled","from_x":0,"from_y":0,"to_x":0,"to_y":500,"target_selector":"body"}`, + }, + }) + ev := ec.waitFor(t, "interaction_scroll_settled", 2*time.Second) + assert.Equal(t, events.CategoryInteraction, ev.Category) + var data map[string]any + require.NoError(t, json.Unmarshal(ev.Data, &data)) + assert.Equal(t, float64(500), data["to_y"]) + }) + + t.Run("layout_shift", func(t *testing.T) { + srv.sendToMonitor(t, map[string]any{ + "method": "PerformanceTimeline.timelineEventAdded", + "params": map[string]any{ + "event": map[string]any{ + "type": "layout-shift", + "frameId": "frame-ls", + "time": 1.5, + "duration": 0.0, + "layoutShiftDetails": map[string]any{ + "score": 0.12, + "hadRecentInput": true, + }, + }, + }, + }) + ev := ec.waitFor(t, "page_layout_shift", 2*time.Second) + assert.Equal(t, events.KindCDP, ev.Source.Kind) + assert.Equal(t, "PerformanceTimeline.timelineEventAdded", ev.Source.Event) + var data map[string]any + require.NoError(t, json.Unmarshal(ev.Data, &data)) + assert.Equal(t, "frame-ls", data["source_frame_id"]) + assert.Equal(t, float64(1.5), data["time"]) + shift := data["layout_shift_details"].(map[string]any) + assert.Equal(t, 0.12, shift["score"]) + assert.Equal(t, true, shift["had_recent_input"]) + _, hasEvent := data["event"] + assert.False(t, hasEvent, "raw CDP event wrapper must not appear in payload") + }) + + t.Run("unknown_binding_ignored", func(t *testing.T) { + srv.sendToMonitor(t, map[string]any{ + "method": "Runtime.bindingCalled", + "params": map[string]any{ + "name": "someOtherBinding", + "payload": `{"type":"interaction_click"}`, + }, + }) + ec.assertNone(t, "interaction_click", 100*time.Millisecond) + }) + + t.Run("rate_limited_per_session", func(t *testing.T) { + // Send two binding events back-to-back within the 50ms window. + // Only the first should produce a published event. + before := func() int { + ec.mu.Lock() + defer ec.mu.Unlock() + count := 0 + for _, ev := range ec.events { + if ev.Type == EventInteractionClick { + count++ + } + } + return count + } + countBefore := before() + + for range 3 { + srv.sendToMonitor(t, map[string]any{ + "method": "Runtime.bindingCalled", + "params": map[string]any{ + "name": "__kernelEvent", + "payload": `{"type":"interaction_click","x":1,"y":1,"selector":"a","tag":"A","text":"x"}`, + }, + }) + } + + // Wait a bit for async delivery, then check only 1 new event was published. + time.Sleep(200 * time.Millisecond) + ec.mu.Lock() + countAfter := 0 + for _, ev := range ec.events { + if ev.Type == EventInteractionClick { + countAfter++ + } + } + ec.mu.Unlock() + assert.Equal(t, countBefore+1, countAfter, "rate limiter should have dropped the 2nd and 3rd events") + }) +} + +func TestPerTargetStateMachines(t *testing.T) { + // attachTarget sends a Target.attachedToTarget message for a page session. + attachTarget := func(srv *testServer, t *testing.T, sessionID, targetID string) { + t.Helper() + srv.sendToMonitor(t, map[string]any{ + "method": "Target.attachedToTarget", + "params": map[string]any{ + "sessionId": sessionID, + "targetInfo": map[string]any{ + "targetId": targetID, "type": "page", + "url": "about:blank", "attached": true, + }, + "waitingForDebugger": false, + }, + }) + } + + t.Run("two_tabs_independent", func(t *testing.T) { + srv := newTestServer(t) + defer srv.close() + _, ec, cleanup := startMonitor(t, srv, nil) + defer cleanup() + + attachTarget(srv, t, "sess-a", "target-a") + attachTarget(srv, t, "sess-b", "target-b") + + // Navigate sess-a and start a request. + srv.sendToMonitor(t, map[string]any{ + "method": "Page.frameNavigated", "sessionId": "sess-a", + "params": map[string]any{"frame": map[string]any{ + "id": "f-a", "url": "https://a.example.com", "loaderId": "l-a", + }}, + }) + ec.waitFor(t, "page_navigation", 2*time.Second) + + srv.sendToMonitor(t, map[string]any{ + "method": "Network.requestWillBeSent", "sessionId": "sess-a", + "params": map[string]any{ + "requestId": "req-a", "type": "Document", "loaderId": "l-a", + "documentURL": "https://a.example.com/", + "request": map[string]any{"method": "GET", "url": "https://a.example.com/"}, + "initiator": map[string]any{"type": "other"}, + }, + }) + ec.waitFor(t, "network_request", 2*time.Second) + + // Navigate sess-b — must not reset sess-a's state machine. + // With per-session state machines, sess-b starts fresh (netPending=0) and + // fires its own network_idle after the 500 ms debounce, independently. + srv.sendToMonitor(t, map[string]any{ + "method": "Page.frameNavigated", "sessionId": "sess-b", + "params": map[string]any{"frame": map[string]any{ + "id": "f-b", "url": "https://b.example.com", "loaderId": "l-b", + }}, + }) + + // Wait past sess-b's 500 ms debounce so its network_idle fires before we + // set our checkpoint. The next new network_idle will then come from sess-a. + time.Sleep(700 * time.Millisecond) + + // Finish sess-a's request; waitForNew captures the current event count so + // sess-b's already-fired network_idle is excluded from the result. + srv.sendToMonitor(t, map[string]any{ + "method": "Network.responseReceived", "sessionId": "sess-a", + "params": map[string]any{ + "requestId": "req-a", "type": "Document", + "response": map[string]any{"status": 200, "statusText": "OK", "headers": map[string]any{}, "mimeType": "text/html"}, + }, + }) + srv.sendToMonitor(t, map[string]any{ + "method": "Network.loadingFinished", "sessionId": "sess-a", + "params": map[string]any{"requestId": "req-a"}, + }) + + ev := ec.waitForNew(t, "network_idle", 2*time.Second) + var data map[string]any + require.NoError(t, json.Unmarshal(ev.Data, &data)) + assert.Equal(t, "sess-a", data["session_id"], "network_idle must be attributed to sess-a") + assert.Equal(t, "l-a", data["loader_id"]) + }) + + t.Run("detach_stops_timer", func(t *testing.T) { + srv := newTestServer(t) + defer srv.close() + _, ec, cleanup := startMonitor(t, srv, nil) + defer cleanup() + + attachTarget(srv, t, "sess-c", "target-c") + + srv.sendToMonitor(t, map[string]any{ + "method": "Page.frameNavigated", "sessionId": "sess-c", + "params": map[string]any{"frame": map[string]any{ + "id": "f-c", "url": "https://c.example.com", "loaderId": "l-c", + }}, + }) + ec.waitFor(t, "page_navigation", 2*time.Second) + + // Start a request, then finish it (arms the 500 ms network_idle timer). + srv.sendToMonitor(t, map[string]any{ + "method": "Network.requestWillBeSent", "sessionId": "sess-c", + "params": map[string]any{ + "requestId": "req-c", "type": "Document", "loaderId": "l-c", + "documentURL": "https://c.example.com/", + "request": map[string]any{"method": "GET", "url": "https://c.example.com/"}, + "initiator": map[string]any{"type": "other"}, + }, + }) + ec.waitFor(t, "network_request", 2*time.Second) + srv.sendToMonitor(t, map[string]any{ + "method": "Network.loadingFinished", "sessionId": "sess-c", + "params": map[string]any{"requestId": "req-c"}, + }) + + // Detach before the 500 ms timer fires; readLoop processes messages in + // order so the stop() call lands well within the debounce window. + srv.sendToMonitor(t, map[string]any{ + "method": "Target.detachedFromTarget", + "params": map[string]any{"sessionId": "sess-c"}, + }) + + ec.assertNone(t, "network_idle", 1200*time.Millisecond) + }) +} diff --git a/server/lib/cdpmonitor/interaction.js b/server/lib/cdpmonitor/interaction.js new file mode 100644 index 00000000..14c34045 --- /dev/null +++ b/server/lib/cdpmonitor/interaction.js @@ -0,0 +1,125 @@ +(function() { + if (window.__kernelEventInjected) return; + var send = window.__kernelEvent; + if (!send) return; + window.__kernelEventInjected = true; + + function sel(el) { + return el.id ? '#' + el.id : (el.className ? '.' + String(el.className).split(' ')[0] : ''); + } + + // Sensitive autocomplete values whose fields must never emit key content. + // Covers passwords, payment card data, and government identifiers. + var SENSITIVE_AUTOCOMPLETE = { + 'current-password': true, 'new-password': true, 'one-time-code': true, + 'cc-number': true, 'cc-csc': true, 'cc-exp': true, 'cc-exp-month': true, + 'cc-exp-year': true, 'cc-name': true, 'cc-type': true, + 'transaction-amount': true, + 'ssn': true, 'passport': true, 'drivers-license': true + }; + + // Sensitive name/id substrings, defence-in-depth for fields without autocomplete. + var SENSITIVE_NAME_RE = /passw|passwd|secret|cvv|cvc|ssn|card.?num|account.?num|pin\b|tax.?id|natl.?id/i; + + // Returns true if the element is one where keystroke content must be suppressed. + function isSensitiveInput(el) { + if (!el || !el.tagName) return false; + var tag = el.tagName.toUpperCase(); + var isEditable = tag === 'INPUT' || tag === 'TEXTAREA' + || el.isContentEditable + || (el.getAttribute && el.getAttribute('role') === 'textbox'); + if (!isEditable) return false; + if (el.type === 'password') return true; + // autocomplete attribute check. + var ac = (el.getAttribute && el.getAttribute('autocomplete')) || ''; + var acTokens = ac.toLowerCase().trim().split(/\s+/); + for (var i = 0; i < acTokens.length; i++) { + if (SENSITIVE_AUTOCOMPLETE[acTokens[i]]) return true; + } + // name/id/aria-label heuristic as fallback, covers custom controls that use ARIA. + var name = (el.name || el.id || (el.getAttribute && el.getAttribute('aria-label')) || ''); + return SENSITIVE_NAME_RE.test(name); + } + + // Returns true if the clicked element's textContent should be suppressed. + // Widens isSensitiveInput to also cover display-only elements (span, div, td, etc.) + // that may render sensitive values, those elements return false from isSensitiveInput + // because they are not editable, but their textContent is still sensitive. + function shouldSuppressClickText(el) { + if (isSensitiveInput(el)) return true; + var id = el.id || ''; + var aria = (el.getAttribute && el.getAttribute('aria-label')) || ''; + return SENSITIVE_NAME_RE.test(id) || SENSITIVE_NAME_RE.test(aria); + } + + // Only reads direct child text nodes and prevents container elements from leaking + // sensitive values rendered in child elements that pass shouldSuppressClickText. + function directText(el) { + var text = ''; + var nodes = el.childNodes; + for (var i = 0; i < nodes.length; i++) { + if (nodes[i].nodeType === 3) { + text += nodes[i].textContent; + } + } + return text.trim(); + } + + document.addEventListener('click', function(e) { + var t = e.target || {}; + // Suppress text capture for sensitive inputs/display elements; record click position/selector only. + // Use directText (not textContent) to avoid leaking sensitive values from child elements. + var text = shouldSuppressClickText(t) ? '' : directText(t).slice(0, 100); + send(JSON.stringify({ + type: 'interaction_click', + x: e.clientX, y: e.clientY, + selector: sel(t), tag: t.tagName || '', + text: text + })); + }, true); + + document.addEventListener('keydown', function(e) { + var t = e.target || {}; + // Never record which key was pressed inside a sensitive field. + if (isSensitiveInput(t)) return; + send(JSON.stringify({ + type: 'interaction_key', + key: e.key, + selector: sel(t), tag: t.tagName || '' + })); + }, true); + + function scrollPos(target) { + if (target === document || target === document.documentElement) { + return {x: window.scrollX, y: window.scrollY}; + } + return {x: target.scrollLeft || 0, y: target.scrollTop || 0}; + } + + var scrollTimer = null; + var scrollStart = null; + var scrollTarget = null; + document.addEventListener('scroll', function(e) { + var target = e.target; + // If target changed mid-scroll, reset tracking for the new target. + if (scrollTarget !== target) { + scrollStart = scrollPos(target); + scrollTarget = target; + } + var fromX = scrollStart.x, fromY = scrollStart.y; + var s = target === document ? 'document' : sel(target); + if (scrollTimer) clearTimeout(scrollTimer); + scrollTimer = setTimeout(function() { + var pos = scrollPos(target); + if (Math.abs(pos.x - fromX) > 5 || Math.abs(pos.y - fromY) > 5) { + send(JSON.stringify({ + type: 'interaction_scroll_settled', + from_x: fromX, from_y: fromY, + to_x: pos.x, to_y: pos.y, + target_selector: s + })); + } + scrollTarget = null; + }, 300); + }, true); +})(); \ No newline at end of file diff --git a/server/lib/cdpmonitor/monitor.go b/server/lib/cdpmonitor/monitor.go index e7cf747f..c25dfac0 100644 --- a/server/lib/cdpmonitor/monitor.go +++ b/server/lib/cdpmonitor/monitor.go @@ -2,8 +2,14 @@ package cdpmonitor import ( "context" + "encoding/json" + "fmt" + "log/slog" + "sync" "sync/atomic" + "time" + "github.com/coder/websocket" "github.com/kernel/kernel-images/server/lib/events" ) @@ -16,15 +22,75 @@ type UpstreamProvider interface { // PublishFunc publishes an Event to the pipeline. type PublishFunc func(ev events.Event) +const wsReadLimit = 8 * 1024 * 1024 + // Monitor manages a CDP WebSocket connection with auto-attach session fan-out. -// Single-use per capture session: call Start to begin, Stop to tear down. +// WebSocket concurrency: coder/websocket guarantees that one concurrent Read +// and one concurrent Write are safe. The readLoop holds the sole Read; all +// writes go through send, which serialises them with conn.Write's internal +// lock. No external write mutex is needed. type Monitor struct { + upstreamMgr UpstreamProvider + publish PublishFunc + displayNum int + log *slog.Logger + + // lifeMu serializes Start, Stop, and restartReadLoop to prevent races on + // conn, lifecycleCtx, cancel, and done. + lifeMu sync.Mutex + conn *websocket.Conn + + nextID atomic.Int64 + pendMu sync.Mutex + pending map[int64]chan cdpMessage + + sessionsMu sync.RWMutex + sessions map[string]targetInfo // sessionID → targetInfo + mainSessionID atomic.Value // string; set on first top-level frameNavigated, cleared on reconnect + + pendReqMu sync.Mutex + pendingRequests map[string]networkReqState // requestId → networkReqState + + computedStates map[string]*computedState // sessionID → state machine; guarded by sessionsMu + + lastScreenshotAt atomic.Int64 // unix millis of last capture + screenshotInFlight atomic.Bool // true while a captureScreenshot goroutine is running + screenshotFn func(ctx context.Context, displayNum int) ([]byte, error) // nil → real ffmpeg + + // bindingRateMu guards bindingLastSeen. + bindingRateMu sync.Mutex + bindingLastSeen map[string]time.Time // sessionID → last accepted binding event time + + // asyncWg tracks all goroutines except readLoop (which is tracked via done). + // subscribeToUpstream and sweepPendingRequests are included so Stop() can + // wait for them to exit before returning. + asyncWg sync.WaitGroup + restartMu sync.Mutex // serializes handleUpstreamRestart to prevent overlapping reconnects + + lifecycleCtx context.Context // cancelled on Stop() + cancel context.CancelFunc + done chan struct{} + readReady chan struct{} // closed when readLoop has started reading + running atomic.Bool } // New creates a Monitor. displayNum is the X display for ffmpeg screenshots. -func New(_ UpstreamProvider, _ PublishFunc, _ int) *Monitor { - return &Monitor{} +func New(upstreamMgr UpstreamProvider, publish PublishFunc, displayNum int, log *slog.Logger) *Monitor { + m := &Monitor{ + upstreamMgr: upstreamMgr, + publish: publish, + displayNum: displayNum, + log: log, + sessions: make(map[string]targetInfo), + computedStates: make(map[string]*computedState), + pending: make(map[int64]chan cdpMessage), + pendingRequests: make(map[string]networkReqState), + bindingLastSeen: make(map[string]time.Time), + } + m.lifecycleCtx = context.Background() + m.mainSessionID.Store(mainSessionUnset) + return m } // IsRunning reports whether the monitor is actively capturing. @@ -33,9 +99,493 @@ func (m *Monitor) IsRunning() bool { } // Start begins CDP capture. Restarts if already running. -func (m *Monitor) Start(_ context.Context) error { +// Not concurrency-safe; callers must serialize Start calls. +func (m *Monitor) Start(ctx context.Context) error { + m.Stop() // no-op if not running + + devtoolsURL := m.upstreamMgr.Current() + if devtoolsURL == "" { + return fmt.Errorf("cdpmonitor: no DevTools URL available") + } + + ctx, cancel := context.WithCancel(ctx) + + conn, _, err := websocket.Dial(ctx, devtoolsURL, nil) + if err != nil { + cancel() + return fmt.Errorf("cdpmonitor: dial %s: %w", devtoolsURL, err) + } + conn.SetReadLimit(wsReadLimit) + + m.lifeMu.Lock() + m.conn = conn + m.lifecycleCtx = ctx + m.cancel = cancel + m.done = make(chan struct{}) + m.readReady = make(chan struct{}) + m.lifeMu.Unlock() + + m.running.Store(true) + m.log.Info("cdpmonitor: started", "url", devtoolsURL) + + go m.readLoop(ctx) + m.asyncWg.Go(func() { m.subscribeToUpstream(ctx) }) + m.asyncWg.Go(func() { m.sweepPendingRequests(ctx) }) + m.asyncWg.Go(func() { m.initSession(ctx) }) + return nil } -// Stop tears down the monitor. Safe to call multiple times. -func (m *Monitor) Stop() {} +// Stop cancels the context and waits for goroutines to exit. +func (m *Monitor) Stop() { + if !m.running.Swap(false) { + m.lifeMu.Lock() + cancel := m.cancel + m.lifeMu.Unlock() + if cancel != nil { + cancel() + } + m.asyncWg.Wait() + return + } + m.log.Info("cdpmonitor: stopping") + + m.lifeMu.Lock() + if m.cancel != nil { + m.cancel() + } + done := m.done + m.lifeMu.Unlock() + + if done != nil { + <-done + } + + // Wait for all in-flight async goroutines (fetchResponseBody, enableDomains, + // screenshots) to finish before closing the connection they may be writing to. + m.asyncWg.Wait() + + m.lifeMu.Lock() + if m.conn != nil { + _ = m.conn.Close(websocket.StatusNormalClosure, "stopped") + m.conn = nil + } + m.lifeMu.Unlock() + + m.clearState() + m.log.Info("cdpmonitor: stopped") +} + +// clearState resets sessions, pending requests, and computed state. +// It also fails all in-flight send() calls so their goroutines are unblocked. +func (m *Monitor) clearState() { + m.sessionsMu.Lock() + prev := m.computedStates + m.sessions = make(map[string]targetInfo) + m.computedStates = make(map[string]*computedState) + m.sessionsMu.Unlock() + for _, cs := range prev { + cs.stop() + } + m.mainSessionID.Store(mainSessionUnset) + + m.pendReqMu.Lock() + m.pendingRequests = make(map[string]networkReqState) + m.pendReqMu.Unlock() + + m.bindingRateMu.Lock() + m.bindingLastSeen = make(map[string]time.Time) + m.bindingRateMu.Unlock() + + m.failPendingCommands() +} + +const pendingRequestTTL = 5 * time.Minute +const sweepInterval = 1 * time.Minute + +// sweepPendingRequests periodically evicts networkReqState entries that have +// been in the map for longer than pendingRequestTTL. This bounds map growth on +// long-lived SPAs where loadingFinished never arrives (e.g. tabs closed mid-flight). +// It exits when ctx is cancelled (Stop/reconnect). +func (m *Monitor) sweepPendingRequests(ctx context.Context) { + ticker := time.NewTicker(sweepInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case now := <-ticker.C: + var toSweep []networkReqState + m.pendReqMu.Lock() + for id, state := range m.pendingRequests { + if now.Sub(state.addedAt) > pendingRequestTTL { + delete(m.pendingRequests, id) + toSweep = append(toSweep, state) + } + } + m.pendReqMu.Unlock() + for _, state := range toSweep { + if cs := m.computedFor(state.sessionID); cs != nil { + cs.onLoadingFinished() + } + } + } + } +} + +// computedFor returns the computedState for the given sessionID, or nil if none exists. +func (m *Monitor) computedFor(sessionID string) *computedState { + m.sessionsMu.RLock() + cs := m.computedStates[sessionID] + m.sessionsMu.RUnlock() + return cs +} + +// failPendingCommands unblocks all in-flight send() calls by delivering an +// error response. This prevents goroutine leaks when the connection is torn +// down during reconnect. +func (m *Monitor) failPendingCommands() { + m.pendMu.Lock() + old := m.pending + m.pending = make(map[int64]chan cdpMessage) + m.pendMu.Unlock() + + disconnectErr := &cdpError{Code: -1, Message: "connection closed"} + for _, ch := range old { + select { + case ch <- cdpMessage{Error: disconnectErr}: + default: + } + } +} + +// readLoop reads CDP messages, routing responses to pending callers and dispatching events. +// On read error (WS drop) this goroutine returns. Reconnection is driven by +// subscribeToUpstream: the UpstreamProvider always pushes a fresh devtools URL +// when the browser process restarts, so same-URL redial is not needed here. +func (m *Monitor) readLoop(ctx context.Context) { + m.lifeMu.Lock() + done := m.done + conn := m.conn + readReady := m.readReady + m.lifeMu.Unlock() + defer close(done) + + if conn == nil { + return + } + + // Signal that readLoop is ready to receive responses. + close(readReady) + + for { + _, b, err := conn.Read(ctx) + if err != nil { + if ctx.Err() == nil { + m.log.Warn("cdpmonitor: read loop exiting on unexpected error", "err", err) + } + return + } + + var msg cdpMessage + if err := json.Unmarshal(b, &msg); err != nil { + m.log.Warn("cdpmonitor: dropping malformed CDP message", "err", err) + continue + } + + if msg.ID != nil { + m.pendMu.Lock() + ch, ok := m.pending[*msg.ID] + m.pendMu.Unlock() + if ok { + select { + case ch <- msg: + default: + // send() already timed out and deregistered; discard. + } + } + continue + } + + m.dispatchEvent(msg) + } +} + +const sendTimeout = 30 * time.Second + +// send issues a CDP command and blocks until the response arrives. +// A 30 s deadline is applied so a non-responsive Chrome cannot stall +// callers indefinitely; the caller's own deadline (if shorter) wins. +func (m *Monitor) send(ctx context.Context, method string, params any, sessionID string) (json.RawMessage, error) { + ctx, cancel := context.WithTimeout(ctx, sendTimeout) + defer cancel() + + id := m.nextID.Add(1) + + var rawParams json.RawMessage + if params != nil { + b, err := json.Marshal(params) + if err != nil { + return nil, fmt.Errorf("marshal params: %w", err) + } + rawParams = b + } + + req := cdpMessage{ID: &id, Method: method, Params: rawParams, SessionID: sessionID} + reqBytes, err := json.Marshal(req) + if err != nil { + return nil, fmt.Errorf("marshal request: %w", err) + } + + ch := make(chan cdpMessage, 1) + m.pendMu.Lock() + m.pending[id] = ch + m.pendMu.Unlock() + defer func() { + m.pendMu.Lock() + delete(m.pending, id) + m.pendMu.Unlock() + }() + + m.lifeMu.Lock() + conn := m.conn + m.lifeMu.Unlock() + if conn == nil { + return nil, fmt.Errorf("cdpmonitor: connection not open") + } + + // coder/websocket allows concurrent Read + Write on the same Conn. + if err := conn.Write(ctx, websocket.MessageText, reqBytes); err != nil { + return nil, fmt.Errorf("write: %w", err) + } + + select { + case resp := <-ch: + if resp.Error != nil { + return nil, resp.Error + } + return resp.Result, nil + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +// initSession enables CDP domains, injects the interaction-tracking script, +// and manually attaches to any targets already open when the monitor started. +// It waits for readLoop to be ready before sending any commands. +func (m *Monitor) initSession(ctx context.Context) { + m.lifeMu.Lock() + readReady := m.readReady + m.lifeMu.Unlock() + select { + case <-readReady: + case <-ctx.Done(): + return + } + + if _, err := m.send(ctx, "Target.setAutoAttach", map[string]any{ + "autoAttach": true, + "waitForDebuggerOnStart": false, + "flatten": true, + }, ""); err != nil && ctx.Err() == nil { + // Without auto-attach the monitor will never see new targets: treat as fatal. + m.log.Error("cdpmonitor: Target.setAutoAttach failed — monitor will not observe new targets", "err", err) + m.publish(events.Event{ + Ts: time.Now().UnixMicro(), + Type: EventMonitorInitFailed, + Category: events.CategorySystem, + Source: events.Source{Kind: events.KindLocalProcess}, + Data: json.RawMessage(`{"step":"Target.setAutoAttach"}`), + }) + return + } + + m.attachExistingTargets(ctx) +} + +// attachExistingTargets fetches all open targets and attaches to any that are +// not already tracked. This catches pages that were open before Start() was called. +func (m *Monitor) attachExistingTargets(ctx context.Context) { + result, err := m.send(ctx, "Target.getTargets", nil, "") + if err != nil { + return + } + var resp struct { + TargetInfos []cdpTargetTargetInfo `json:"targetInfos"` + } + if err := json.Unmarshal(result, &resp); err != nil { + return + } + m.sessionsMu.RLock() + attached := make(map[string]bool, len(m.sessions)) + for _, info := range m.sessions { + attached[info.targetID] = true + } + m.sessionsMu.RUnlock() + + for _, ti := range resp.TargetInfos { + if ti.Type != targetTypePage || attached[ti.TargetID] { + continue + } + targetID := ti.TargetID + m.asyncWg.Go(func() { + _, _ = m.send(ctx, "Target.attachToTarget", map[string]any{ + "targetId": targetID, + "flatten": true, + }, "") + }) + } +} + +// restartReadLoop waits for the current readLoop to exit, then starts a new one. +// Returns false if the context was cancelled before the restart completed. +func (m *Monitor) restartReadLoop(ctx context.Context) bool { + m.lifeMu.Lock() + done := m.done + m.lifeMu.Unlock() + + // Wait for old readLoop, but bail if context is cancelled (e.g. Stop called). + select { + case <-done: + case <-ctx.Done(): + return false + } + + m.lifeMu.Lock() + m.done = make(chan struct{}) + m.readReady = make(chan struct{}) + m.lifeMu.Unlock() + + go m.readLoop(ctx) + return true +} + +// subscribeToUpstream reconnects with backoff on Chrome restarts, publishing disconnect/reconnect events. +func (m *Monitor) subscribeToUpstream(ctx context.Context) { + ch, cancel := m.upstreamMgr.Subscribe() + defer cancel() + + for { + select { + case <-ctx.Done(): + return + case newURL, ok := <-ch: + if !ok { + return + } + m.handleUpstreamRestart(ctx, newURL) + } + } +} + +// handleUpstreamRestart tears down the old connection, reconnects with backoff, +// and re-initializes the CDP session. Serialized by restartMu to prevent +// overlapping reconnects from rapid successive Chrome restarts. +func (m *Monitor) handleUpstreamRestart(ctx context.Context, newURL string) { + m.restartMu.Lock() + defer m.restartMu.Unlock() + + if ctx.Err() != nil { + return + } + m.publish(events.Event{ + Ts: time.Now().UnixMicro(), + Type: EventMonitorDisconnected, + Category: events.CategorySystem, + Source: events.Source{Kind: events.KindLocalProcess}, + Data: json.RawMessage(`{"reason":"` + ReasonChromeRestarted + `"}`), + }) + + startReconnect := time.Now() + + m.lifeMu.Lock() + if m.conn != nil { + _ = m.conn.Close(websocket.StatusNormalClosure, "reconnecting") + m.conn = nil + } + m.lifeMu.Unlock() + + if !m.reconnectWithBackoff(ctx, newURL) { + // Context cancelled means Stop() was called, not a failure. + if ctx.Err() == nil { + // Cancel the lifecycle context before setting running=false so that + // goroutines blocked on ctx.Done() begin exiting. If we set + // running=false first, a concurrent Stop() call returns immediately + // without cancelling, permanently orphaning those goroutines in asyncWg. + m.lifeMu.Lock() + if m.cancel != nil { + m.cancel() + } + m.lifeMu.Unlock() + m.clearState() + m.running.Store(false) + m.publish(events.Event{ + Ts: time.Now().UnixMicro(), + Type: EventMonitorReconnectFailed, + Category: events.CategorySystem, + Source: events.Source{Kind: events.KindLocalProcess}, + Data: json.RawMessage(`{"reason":"` + ReasonReconnectExhausted + `"}`), + }) + } + return + } + + // restartReadLoop waits for the old readLoop to exit before returning, + // so clearState runs only after the old loop has stopped touching shared state. + if !m.restartReadLoop(ctx) { + return + } + m.clearState() + + m.asyncWg.Go(func() { m.initSession(ctx) }) + reconnectDurationMs := time.Since(startReconnect).Milliseconds() + m.log.Info("cdpmonitor: reconnected", "url", newURL, "duration_ms", reconnectDurationMs) + + m.publish(events.Event{ + Ts: time.Now().UnixMicro(), + Type: EventMonitorReconnected, + Category: events.CategorySystem, + Source: events.Source{Kind: events.KindLocalProcess}, + Data: json.RawMessage(fmt.Sprintf(`{"reconnect_duration_ms":%d}`, reconnectDurationMs)), + }) +} + +const maxReconnectAttempts = 10 + +var reconnectBackoffs = []time.Duration{ + 250 * time.Millisecond, + 500 * time.Millisecond, + 1 * time.Second, + 2 * time.Second, +} + +// reconnectWithBackoff attempts to dial newURL up to maxReconnectAttempts times with exponential backoff. +func (m *Monitor) reconnectWithBackoff(ctx context.Context, newURL string) bool { + for attempt := range maxReconnectAttempts { + if ctx.Err() != nil { + return false + } + + if attempt > 0 { + idx := min(attempt-1, len(reconnectBackoffs)-1) + select { + case <-ctx.Done(): + return false + case <-time.After(reconnectBackoffs[idx]): + } + } + + conn, _, err := websocket.Dial(ctx, newURL, nil) + if err != nil { + m.log.Warn("cdpmonitor: reconnect attempt failed", "attempt", attempt+1, "max_attempts", maxReconnectAttempts, "url", newURL, "err", err) + continue + } + conn.SetReadLimit(wsReadLimit) + + m.lifeMu.Lock() + m.conn = conn + m.lifeMu.Unlock() + return true + } + return false +} diff --git a/server/lib/cdpmonitor/monitor_test.go b/server/lib/cdpmonitor/monitor_test.go new file mode 100644 index 00000000..07267078 --- /dev/null +++ b/server/lib/cdpmonitor/monitor_test.go @@ -0,0 +1,338 @@ +package cdpmonitor + +import ( + "context" + "encoding/json" + "sync/atomic" + "testing" + "time" + + "github.com/kernel/kernel-images/server/lib/events" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// discardLogger is defined in cdp_test.go (package-level, shared across test files). + +func TestLifecycle(t *testing.T) { + srv := newTestServer(t) + defer srv.close() + + ec := newEventCollector() + upstream := newTestUpstream(srv.wsURL()) + m := New(upstream, ec.publishFn(), 99, discardLogger) + + assert.False(t, m.IsRunning(), "idle at boot") + + require.NoError(t, m.Start(context.Background())) + assert.True(t, m.IsRunning(), "running after Start") + srv.readFromMonitor(t, 2*time.Second) + + m.Stop() + assert.False(t, m.IsRunning(), "stopped after Stop") + + require.NoError(t, m.Start(context.Background())) + assert.True(t, m.IsRunning(), "running after second Start") + srv.readFromMonitor(t, 2*time.Second) + + require.NoError(t, m.Start(context.Background())) + assert.True(t, m.IsRunning(), "running after implicit restart") + + m.Stop() + assert.False(t, m.IsRunning(), "stopped at end") +} + +func TestReconnect(t *testing.T) { + srv1 := newTestServer(t) + + upstream := newTestUpstream(srv1.wsURL()) + ec := newEventCollector() + m := New(upstream, ec.publishFn(), 99, discardLogger) + require.NoError(t, m.Start(context.Background())) + defer m.Stop() + + srv1.readFromMonitor(t, 2*time.Second) + + srv2 := newTestServer(t) + defer srv2.close() + defer srv1.close() + + upstream.notifyRestart(srv2.wsURL()) + + ec.waitFor(t, "monitor_disconnected", 3*time.Second) + srv2.readFromMonitor(t, 5*time.Second) + + ev := ec.waitFor(t, "monitor_reconnected", 3*time.Second) + var data map[string]any + require.NoError(t, json.Unmarshal(ev.Data, &data)) + _, ok := data["reconnect_duration_ms"] + assert.True(t, ok, "missing reconnect_duration_ms") +} + +func TestScreenshot(t *testing.T) { + srv := newTestServer(t) + defer srv.close() + + m, ec, cleanup := startMonitor(t, srv, nil) + defer cleanup() + + var captureCount atomic.Int32 + m.screenshotFn = func(ctx context.Context, displayNum int) ([]byte, error) { + captureCount.Add(1) + return minimalPNG, nil + } + + t.Run("capture_and_publish", func(t *testing.T) { + m.tryScreenshot(context.Background(), "Page.loadEventFired", "") + require.Eventually(t, func() bool { return captureCount.Load() == 1 }, 2*time.Second, 20*time.Millisecond) + + ev := ec.waitFor(t, "monitor_screenshot", 2*time.Second) + assert.Equal(t, events.CategorySystem, ev.Category) + assert.Equal(t, events.KindLocalProcess, ev.Source.Kind) + assert.Equal(t, "Page.loadEventFired", ev.Source.Event) + var data map[string]any + require.NoError(t, json.Unmarshal(ev.Data, &data)) + assert.NotEmpty(t, data["png"]) + }) + + t.Run("rate_limited", func(t *testing.T) { + before := captureCount.Load() + m.tryScreenshot(context.Background(), "Page.loadEventFired", "") + time.Sleep(100 * time.Millisecond) + assert.Equal(t, before, captureCount.Load(), "should be rate-limited within 2s") + }) + + t.Run("captures_after_cooldown", func(t *testing.T) { + m.lastScreenshotAt.Store(time.Now().Add(-3 * time.Second).UnixMilli()) + before := captureCount.Load() + m.tryScreenshot(context.Background(), "Page.loadEventFired", "") + require.Eventually(t, func() bool { return captureCount.Load() > before }, 2*time.Second, 20*time.Millisecond) + }) +} + +// TestFailPendingCommandsUnblocksSend verifies that clearState (called during +// reconnect) unblocks any goroutine blocked in send() by delivering an error. +func TestFailPendingCommandsUnblocksSend(t *testing.T) { + ec := newEventCollector() + upstream := newTestUpstream("ws://127.0.0.1:0") + m := New(upstream, ec.publishFn(), 0, discardLogger) + + // Pre-register a fake pending command channel as if send() had registered it. + id := int64(42) + ch := make(chan cdpMessage, 1) + m.pendMu.Lock() + m.pending[id] = ch + m.pendMu.Unlock() + + // failPendingCommands should deliver an error message to ch without blocking. + done := make(chan struct{}) + go func() { + m.failPendingCommands() + close(done) + }() + + select { + case msg := <-ch: + require.NotNil(t, msg.Error, "expected error response from failPendingCommands") + assert.Equal(t, -1, msg.Error.Code) + case <-time.After(2 * time.Second): + t.Fatal("failPendingCommands did not unblock the pending channel") + } + <-done +} + +// TestInitSessionAutoAttachFailure verifies that a monitor_init_failed event is +// published (and the monitor logs the failure) when Target.setAutoAttach returns +// an error. +func TestInitSessionAutoAttachFailure(t *testing.T) { + srv := newTestServer(t) + defer srv.close() + + ec := newEventCollector() + upstream := newTestUpstream(srv.wsURL()) + m := New(upstream, ec.publishFn(), 99, discardLogger) + require.NoError(t, m.Start(context.Background())) + defer m.Stop() + + stopResponder := make(chan struct{}) + defer close(stopResponder) + + go listenAndRespond(srv, stopResponder, func(msg cdpMessage) any { + if msg.Method == "Target.setAutoAttach" { + return map[string]any{ + "id": msg.ID, + "error": map[string]any{"code": -32601, "message": "Method not found"}, + } + } + return nil + }) + + ec.waitFor(t, EventMonitorInitFailed, 3*time.Second) +} + +func TestAutoAttach(t *testing.T) { + srv := newTestServer(t) + defer srv.close() + + ec := newEventCollector() + upstream := newTestUpstream(srv.wsURL()) + m := New(upstream, ec.publishFn(), 99, discardLogger) + require.NoError(t, m.Start(context.Background())) + defer m.Stop() + + msg := srv.readFromMonitor(t, 3*time.Second) + assert.Equal(t, "Target.setAutoAttach", msg.Method) + + var params struct { + AutoAttach bool `json:"autoAttach"` + WaitForDebuggerOnStart bool `json:"waitForDebuggerOnStart"` + Flatten bool `json:"flatten"` + } + require.NoError(t, json.Unmarshal(msg.Params, ¶ms)) + assert.True(t, params.AutoAttach) + assert.False(t, params.WaitForDebuggerOnStart) + assert.True(t, params.Flatten) + + stopResponder := make(chan struct{}) + go listenAndRespond(srv, stopResponder, nil) + defer close(stopResponder) + srv.sendToMonitor(t, map[string]any{"id": msg.ID, "result": map[string]any{}}) + + srv.sendToMonitor(t, map[string]any{ + "method": "Target.attachedToTarget", + "params": map[string]any{ + "sessionId": "session-abc", + "targetInfo": map[string]any{"targetId": "target-xyz", "type": "page", "url": "https://example.com"}, + }, + }) + require.Eventually(t, func() bool { + m.sessionsMu.RLock() + defer m.sessionsMu.RUnlock() + _, ok := m.sessions["session-abc"] + return ok + }, 2*time.Second, 50*time.Millisecond, "session not stored") + + m.sessionsMu.RLock() + info := m.sessions["session-abc"] + m.sessionsMu.RUnlock() + assert.Equal(t, "target-xyz", info.targetID) + assert.Equal(t, "page", info.targetType) +} + +func TestAttachExistingTargets(t *testing.T) { + srv := newTestServer(t) + defer srv.close() + + responder := func(msg cdpMessage) any { + switch msg.Method { + case "Target.getTargets": + return map[string]any{ + "id": msg.ID, + "result": map[string]any{ + "targetInfos": []any{ + map[string]any{"targetId": "existing-1", "type": "page", "url": "https://preexisting.example.com"}, + }, + }, + } + case "Target.attachToTarget": + srv.sendToMonitor(t, map[string]any{ + "method": "Target.attachedToTarget", + "params": map[string]any{ + "sessionId": "session-existing-1", + "targetInfo": map[string]any{"targetId": "existing-1", "type": "page", "url": "https://preexisting.example.com"}, + }, + }) + return map[string]any{"id": msg.ID, "result": map[string]any{"sessionId": "session-existing-1"}} + } + return nil + } + + m, _, cleanup := startMonitor(t, srv, responder) + defer cleanup() + + require.Eventually(t, func() bool { + m.sessionsMu.RLock() + defer m.sessionsMu.RUnlock() + _, ok := m.sessions["session-existing-1"] + return ok + }, 3*time.Second, 50*time.Millisecond, "existing target not auto-attached") + + m.sessionsMu.RLock() + info := m.sessions["session-existing-1"] + m.sessionsMu.RUnlock() + assert.Equal(t, "existing-1", info.targetID) +} + +// TestRedirectCounter verifies that redirect hops (same requestId, multiple +// requestWillBeSent) do not double-increment netPending, which would permanently +// block network_idle. +func TestRedirectCounter(t *testing.T) { + m, ec := newComputedMonitor(t) + navigateMonitor(m, "https://example.com") + + initiator := json.RawMessage(`{"type":"other"}`) + // First requestWillBeSent — genuine new request. + m.handleNetworkRequest(cdpNetworkRequestWillBeSentParams{ + RequestID: "r-redirect", + Type: "Document", + Request: cdpNetworkRequest{Method: "GET", URL: "https://example.com/old"}, + Initiator: initiator, + }, "s1") + + // Second requestWillBeSent with the same requestId — this is the redirect hop. + m.handleNetworkRequest(cdpNetworkRequestWillBeSentParams{ + RequestID: "r-redirect", + Type: "Document", + Request: cdpNetworkRequest{Method: "GET", URL: "https://example.com/new"}, + Initiator: initiator, + }, "s1") + + // Only one loadingFinished fires per redirect chain. + m.handleLoadingFinished(context.Background(), cdpNetworkLoadingFinishedParams{RequestID: "r-redirect"}, "s1") + + // If netPending was double-incremented, network_idle would never fire. + ec.waitFor(t, "network_idle", 2*time.Second) +} + +// TestSubframeNavigationNoReset verifies that a frameNavigated event with a +// non-empty parentId does not reset computed state (netPending, timers, etc.). +func TestSubframeNavigationNoReset(t *testing.T) { + m, ec := newComputedMonitor(t) + navigateMonitor(m, "https://example.com") // top-level nav, sets mainSessionID + + // Start a request on the main frame. + simulateRequest(m, "main-req") + + // An iframe navigates — should not reset state or clear pendingRequests. + m.handleFrameNavigated(cdpPageFrameNavigatedParams{ + Frame: cdpPageFrame{ + ID: "iframe-frame", + ParentID: "top-frame", + URL: "https://iframe.example.com", + }, + }, "s1") + + // mainSessionID should still be "s1", not reset by the subframe nav. + assert.Equal(t, "s1", m.mainSessionID.Load(), "mainSessionID should not change on subframe nav") + + // Finishing the main request should still drive network_idle (state not reset). + simulateFinished(m, "main-req") + ec.waitFor(t, "network_idle", 2*time.Second) +} + +// TestIframeTargetNoStateMachine verifies that attaching an iframe target does +// not create a computedState. Only page targets get state machines; iframes share +// the CDP page domains but must not generate computed events like navigation_settled. +func TestIframeTargetNoStateMachine(t *testing.T) { + m, _ := newComputedMonitor(t) + m.sessionsMu.Lock() + m.sessions["iframe-session"] = targetInfo{targetID: "iframe-target", targetType: "iframe"} + // Intentionally do NOT create a computedState — mirrors handleAttachedToTarget behaviour. + m.sessionsMu.Unlock() + + m.sessionsMu.RLock() + cs := m.computedStates["iframe-session"] + m.sessionsMu.RUnlock() + + assert.Nil(t, cs, "iframe target must not have a computedState") +} diff --git a/server/lib/cdpmonitor/screenshot.go b/server/lib/cdpmonitor/screenshot.go new file mode 100644 index 00000000..b8f17f4a --- /dev/null +++ b/server/lib/cdpmonitor/screenshot.go @@ -0,0 +1,119 @@ +package cdpmonitor + +import ( + "bytes" + "context" + "encoding/base64" + "encoding/json" + "fmt" + "maps" + "os/exec" + "time" + + "github.com/kernel/kernel-images/server/lib/events" +) + +// tryScreenshot fires a screenshot if the 2s rate-limit window has elapsed. +// screenshotInFlight CAS is checked first so that a blocked attempt never +// consumes the rate-limit window without starting a capture. lastScreenshotAt +// is only advanced after the in-flight slot is claimed; if that CAS then loses +// to a concurrent goroutine the slot is released and we return cleanly. +// sourceEvent is the CDP event that triggered the capture; sessionID is used +// to snapshot nav context before the async goroutine fires. +func (m *Monitor) tryScreenshot(ctx context.Context, sourceEvent, sessionID string) { + now := time.Now().UnixMilli() + last := m.lastScreenshotAt.Load() + if now-last < 2000 { + return + } + if !m.screenshotInFlight.CompareAndSwap(false, true) { + return + } + if !m.lastScreenshotAt.CompareAndSwap(last, now) { + m.screenshotInFlight.Store(false) + return + } + var navData json.RawMessage + var navMeta map[string]string + if cs := m.computedFor(sessionID); cs != nil { + navData, navMeta = cs.navSnapshot() + } + m.asyncWg.Go(func() { + defer m.screenshotInFlight.Store(false) + m.captureScreenshot(ctx, sourceEvent, navData, navMeta) + }) +} + +const screenshotTimeout = 10 * time.Second + +// captureScreenshot takes a screenshot via ffmpeg x11grab (or the screenshotFn +// seam in tests), optionally downscales it, and publishes a screenshot event. +// navData and navMeta are pre-snapped from the owning session's computedState; +// they may be nil if no state machine exists for the session. +func (m *Monitor) captureScreenshot(parentCtx context.Context, sourceEvent string, navData json.RawMessage, navMeta map[string]string) { + ctx, cancel := context.WithTimeout(parentCtx, screenshotTimeout) + defer cancel() + var pngBytes []byte + var err error + + if m.screenshotFn != nil { + pngBytes, err = m.screenshotFn(ctx, m.displayNum) + } else { + pngBytes, err = captureViaFFmpeg(ctx, m.displayNum, 1) + } + if err != nil { + m.log.Warn("cdpmonitor: screenshot capture failed", "err", err) + return + } + + // Downscale if base64 output would exceed ~972KB (~729KB raw × 4/3 base64 inflation). + const rawThreshold = 729 * 1024 + for scale := 2; len(pngBytes) > rawThreshold && scale <= 16 && m.screenshotFn == nil; scale *= 2 { + pngBytes, err = captureViaFFmpeg(ctx, m.displayNum, scale) + if err != nil { + m.log.Warn("cdpmonitor: screenshot downscale failed", "scale", scale, "err", err) + return + } + } + + encoded := base64.StdEncoding.EncodeToString(pngBytes) + payload := map[string]any{screenshotDataKey: encoded} + if navData != nil { + var nav map[string]any + if json.Unmarshal(navData, &nav) == nil { + maps.Copy(payload, nav) + } + } + data, _ := json.Marshal(payload) + + m.publish(events.Event{ + Ts: time.Now().UnixMicro(), + Type: EventScreenshot, + Category: events.CategorySystem, + Source: events.Source{Kind: events.KindLocalProcess, Event: sourceEvent, Metadata: navMeta}, + Data: data, + }) +} + +// captureViaFFmpeg runs ffmpeg x11grab to capture a PNG screenshot. +// If divisor > 1, a scale filter is applied to reduce the output size. +func captureViaFFmpeg(ctx context.Context, displayNum, divisor int) ([]byte, error) { + args := []string{ + "-f", "x11grab", + "-i", fmt.Sprintf(":%d", displayNum), + "-vframes", "1", + } + if divisor > 1 { + args = append(args, "-vf", fmt.Sprintf("scale=iw/%d:ih/%d", divisor, divisor)) + } + args = append(args, "-f", "image2", "pipe:1") + + var out, stderr bytes.Buffer + cmd := exec.CommandContext(ctx, "ffmpeg", args...) + cmd.Stdout = &out + cmd.Stderr = &stderr + if err := cmd.Run(); err != nil { + return nil, fmt.Errorf("%w: %s", err, stderr.String()) + } + return out.Bytes(), nil +} diff --git a/server/lib/cdpmonitor/testdata/Network_loadingFailed.json b/server/lib/cdpmonitor/testdata/Network_loadingFailed.json new file mode 100644 index 00000000..ea3fcf76 --- /dev/null +++ b/server/lib/cdpmonitor/testdata/Network_loadingFailed.json @@ -0,0 +1,8 @@ +{ + "requestId": "1000.6", + "timestamp": 1236.1, + "type": "XHR", + "errorText": "net::ERR_CONNECTION_REFUSED", + "canceled": true, + "blockedReason": "other" +} diff --git a/server/lib/cdpmonitor/testdata/Network_loadingFinished.json b/server/lib/cdpmonitor/testdata/Network_loadingFinished.json new file mode 100644 index 00000000..f7242ee8 --- /dev/null +++ b/server/lib/cdpmonitor/testdata/Network_loadingFinished.json @@ -0,0 +1,5 @@ +{ + "requestId": "1000.5", + "timestamp": 1235.1, + "encodedDataLength": 1234 +} diff --git a/server/lib/cdpmonitor/testdata/Network_requestWillBeSent.json b/server/lib/cdpmonitor/testdata/Network_requestWillBeSent.json new file mode 100644 index 00000000..f9144da1 --- /dev/null +++ b/server/lib/cdpmonitor/testdata/Network_requestWillBeSent.json @@ -0,0 +1,31 @@ +{ + "requestId": "1000.5", + "loaderId": "LOADER123", + "documentURL": "https://example.com/", + "request": { + "url": "https://api.example.com/data", + "method": "POST", + "headers": { + "Content-Type": "application/json", + "User-Agent": "Mozilla/5.0" + }, + "postData": "{\"hello\":\"world\"}", + "hasPostData": true, + "mixedContentType": "none", + "initialPriority": "High", + "referrerPolicy": "strict-origin-when-cross-origin", + "isSameSite": true + }, + "timestamp": 1234.5678, + "wallTime": 1715000000.123, + "initiator": { + "type": "script", + "stack": { + "callFrames": [] + } + }, + "redirectHasExtraInfo": true, + "type": "XHR", + "frameId": "FRAME123", + "hasUserGesture": true +} diff --git a/server/lib/cdpmonitor/testdata/Network_responseReceived.json b/server/lib/cdpmonitor/testdata/Network_responseReceived.json new file mode 100644 index 00000000..487a8beb --- /dev/null +++ b/server/lib/cdpmonitor/testdata/Network_responseReceived.json @@ -0,0 +1,36 @@ +{ + "requestId": "1000.5", + "loaderId": "LOADER123", + "timestamp": 1234.9, + "type": "XHR", + "response": { + "url": "https://api.example.com/data", + "status": 200, + "statusText": "OK", + "headers": { + "Content-Type": "application/json", + "Cache-Control": "no-cache" + }, + "mimeType": "application/json", + "charset": "utf-8", + "connectionReused": true, + "connectionId": 42, + "remoteIPAddress": "93.184.216.34", + "remotePort": 443, + "fromDiskCache": false, + "fromServiceWorker": false, + "fromPrefetchCache": false, + "encodedDataLength": 1234, + "timing": { + "requestTime": 1234.5, + "sendStart": 0.5, + "sendEnd": 1.0, + "receiveHeadersEnd": 200 + }, + "responseTime": 1715000000.4, + "protocol": "http/1.1", + "securityState": "secure" + }, + "hasExtraInfo": true, + "frameId": "FRAME123" +} diff --git a/server/lib/cdpmonitor/testdata/Page_domContentEventFired.json b/server/lib/cdpmonitor/testdata/Page_domContentEventFired.json new file mode 100644 index 00000000..2406ebee --- /dev/null +++ b/server/lib/cdpmonitor/testdata/Page_domContentEventFired.json @@ -0,0 +1,3 @@ +{ + "timestamp": 1234.567 +} diff --git a/server/lib/cdpmonitor/testdata/Page_frameNavigated.json b/server/lib/cdpmonitor/testdata/Page_frameNavigated.json new file mode 100644 index 00000000..948dda02 --- /dev/null +++ b/server/lib/cdpmonitor/testdata/Page_frameNavigated.json @@ -0,0 +1,14 @@ +{ + "frame": { + "id": "FRAME123", + "loaderId": "LOADER123", + "url": "https://example.com/page", + "domainAndRegistry": "example.com", + "securityOrigin": "https://example.com", + "mimeType": "text/html", + "secureContextType": "Secure", + "crossOriginIsolatedContextType": "NotIsolated", + "gatedAPIFeatures": [] + }, + "type": "Navigation" +} diff --git a/server/lib/cdpmonitor/testdata/Page_loadEventFired.json b/server/lib/cdpmonitor/testdata/Page_loadEventFired.json new file mode 100644 index 00000000..f9a4ea55 --- /dev/null +++ b/server/lib/cdpmonitor/testdata/Page_loadEventFired.json @@ -0,0 +1,3 @@ +{ + "timestamp": 1234.678 +} diff --git a/server/lib/cdpmonitor/testdata/PerformanceTimeline_timelineEventAdded.json b/server/lib/cdpmonitor/testdata/PerformanceTimeline_timelineEventAdded.json new file mode 100644 index 00000000..02f565a8 --- /dev/null +++ b/server/lib/cdpmonitor/testdata/PerformanceTimeline_timelineEventAdded.json @@ -0,0 +1,20 @@ +{ + "event": { + "frameId": "FRAME123", + "type": "layout-shift", + "name": "layout-shift", + "time": 1234.5, + "duration": 0.5, + "layoutShiftDetails": { + "value": 0.15, + "hadRecentInput": false, + "lastInputTime": 0, + "sources": [ + { + "previousRect": {"x": 0, "y": 0, "width": 100, "height": 100}, + "currentRect": {"x": 0, "y": 50, "width": 100, "height": 100} + } + ] + } + } +} diff --git a/server/lib/cdpmonitor/testdata/Runtime_bindingCalled.json b/server/lib/cdpmonitor/testdata/Runtime_bindingCalled.json new file mode 100644 index 00000000..16512769 --- /dev/null +++ b/server/lib/cdpmonitor/testdata/Runtime_bindingCalled.json @@ -0,0 +1,5 @@ +{ + "name": "__kernelEvent", + "payload": "{\"type\":\"interaction_click\",\"x\":10,\"y\":20,\"selector\":\"button\",\"tag\":\"BUTTON\",\"text\":\"OK\"}", + "executionContextId": 3 +} diff --git a/server/lib/cdpmonitor/testdata/Runtime_consoleAPICalled.json b/server/lib/cdpmonitor/testdata/Runtime_consoleAPICalled.json new file mode 100644 index 00000000..6b8dafa0 --- /dev/null +++ b/server/lib/cdpmonitor/testdata/Runtime_consoleAPICalled.json @@ -0,0 +1,44 @@ +{ + "type": "log", + "args": [ + { + "type": "string", + "value": "hello world" + }, + { + "type": "number", + "value": 42, + "description": "42" + }, + { + "type": "object", + "subtype": "array", + "className": "Array", + "objectId": "{\"injectedScriptId\":1,\"id\":1}", + "description": "Array(2)", + "preview": { + "type": "object", + "subtype": "array", + "description": "Array(2)", + "overflow": false, + "properties": [ + {"name": "0", "type": "number", "value": "1"}, + {"name": "1", "type": "number", "value": "2"} + ] + } + } + ], + "executionContextId": 3, + "timestamp": 1234567890.123, + "stackTrace": { + "callFrames": [ + { + "functionName": "", + "scriptId": "4", + "url": "https://example.com/app.js", + "lineNumber": 10, + "columnNumber": 5 + } + ] + } +} diff --git a/server/lib/cdpmonitor/testdata/Runtime_exceptionThrown.json b/server/lib/cdpmonitor/testdata/Runtime_exceptionThrown.json new file mode 100644 index 00000000..d3230419 --- /dev/null +++ b/server/lib/cdpmonitor/testdata/Runtime_exceptionThrown.json @@ -0,0 +1,30 @@ +{ + "timestamp": 1234567890.456, + "exceptionDetails": { + "exceptionId": 1, + "text": "Uncaught", + "lineNumber": 42, + "columnNumber": 7, + "scriptId": "5", + "url": "https://example.com/app.js", + "stackTrace": { + "callFrames": [ + { + "functionName": "throwIt", + "scriptId": "5", + "url": "https://example.com/app.js", + "lineNumber": 42, + "columnNumber": 7 + } + ] + }, + "exception": { + "type": "object", + "subtype": "error", + "className": "TypeError", + "description": "TypeError: Cannot read properties of undefined", + "objectId": "{\"injectedScriptId\":1,\"id\":2}" + }, + "executionContextId": 3 + } +} diff --git a/server/lib/cdpmonitor/testdata/Target_attachedToTarget.json b/server/lib/cdpmonitor/testdata/Target_attachedToTarget.json new file mode 100644 index 00000000..7df628a4 --- /dev/null +++ b/server/lib/cdpmonitor/testdata/Target_attachedToTarget.json @@ -0,0 +1,13 @@ +{ + "sessionId": "SESSION123", + "targetInfo": { + "targetId": "TARGET123", + "type": "page", + "title": "Example", + "url": "https://example.com/", + "attached": true, + "canAccessOpener": true, + "browserContextId": "CTX123" + }, + "waitingForDebugger": true +} diff --git a/server/lib/cdpmonitor/testdata/Target_detachedFromTarget.json b/server/lib/cdpmonitor/testdata/Target_detachedFromTarget.json new file mode 100644 index 00000000..018d5061 --- /dev/null +++ b/server/lib/cdpmonitor/testdata/Target_detachedFromTarget.json @@ -0,0 +1,4 @@ +{ + "sessionId": "SESSION123", + "targetId": "TARGET123" +} diff --git a/server/lib/cdpmonitor/types.go b/server/lib/cdpmonitor/types.go new file mode 100644 index 00000000..6c34c529 --- /dev/null +++ b/server/lib/cdpmonitor/types.go @@ -0,0 +1,138 @@ +package cdpmonitor + +import ( + "encoding/json" + "fmt" + "time" +) + +// mainSessionUnset is the sentinel stored in mainSessionID before any +// top-level frameNavigated has been received. Using a sentinel prevents the +// empty-string zero value from accidentally matching CDP messages that arrive +// on the root session (sessionID="") before navigation has been recorded. +const mainSessionUnset = "\x00unset" + +// CDP-derived events — direct translations of DevTools Protocol notifications. +// Each maps 1-to-1 with a specific CDP domain event (Runtime.*, Network.*, +// Page.*, PerformanceTimeline.*) received from Chrome. +const ( + EventConsoleLog = "console_log" // Runtime.consoleAPICalled (non-error types) + EventConsoleError = "console_error" // Runtime.consoleAPICalled (type=error) or Runtime.exceptionThrown + EventNetworkRequest = "network_request" // Network.requestWillBeSent + EventNetworkResponse = "network_response" // Network.loadingFinished (with prior responseReceived) + EventNetworkLoadingFailed = "network_loading_failed" // Network.loadingFailed + EventNavigation = "page_navigation" // Page.frameNavigated + EventDOMContentLoaded = "page_dom_content_loaded" // Page.domContentEventFired + EventPageLoad = "page_load" // Page.loadEventFired + EventLayoutShift = "page_layout_shift" // PerformanceTimeline event of type "layout-shift" + EventLCP = "page_lcp" // PerformanceTimeline event of type "largest-contentful-paint" + EventTabOpened = "page_tab_opened" // Target.attachedToTarget for type=page +) + +// Computed events — synthetic events derived by computed.go state machines. +// None of these correspond to a single CDP notification; they are inferred from +// sequences of CDP events and debounce timers. +const ( + EventNetworkIdle = "network_idle" // 500 ms after all in-flight requests finish + EventLayoutSettled = "page_layout_settled" // 1 s after page_load with no intervening layout shifts + EventNavigationSettled = "page_navigation_settled" // fires once page_dom_content_loaded + network_idle + page_layout_settled all hold +) + +// Interaction events — fired by injected page-side JS (interaction.js) via the +// Runtime.bindingCalled mechanism. They originate in the browser's renderer +// process, not from Chrome's network or page domains. +const ( + EventInteractionClick = "interaction_click" // document click (target selector, coords, text) + EventInteractionKey = "interaction_key" // keydown (key name, target selector) + EventScrollSettled = "interaction_scroll_settled" // 300 ms after the last scroll event on a target +) + +// Monitor lifecycle and internal events — emitted by the monitor itself, not by Chrome. +const ( + EventScreenshot = "monitor_screenshot" // ffmpeg frame capture on page load or JS exception + EventMonitorDisconnected = "monitor_disconnected" // WebSocket to Chrome closed unexpectedly + EventMonitorReconnected = "monitor_reconnected" // successfully reconnected after a disconnect + EventMonitorReconnectFailed = "monitor_reconnect_failed" // reconnect attempts exhausted + EventMonitorInitFailed = "monitor_init_failed" // could not initialise the CDP session +) + +// Metadata keys written into events.Source.Metadata for CDP-sourced events. +const ( + MetadataKeyCDPSessionID = "cdp_session_id" + MetadataKeyTargetID = "target_id" + MetadataKeyTargetType = "target_type" +) + +const ( + timelineEventLayoutShift = "layout-shift" + timelineEventLCP = "largest-contentful-paint" +) + +// CDP target type for browser pages (as opposed to workers, iframes, etc.). +const targetTypePage = "page" + +// screenshot event payload key for the base64-encoded PNG data. +const screenshotDataKey = "png" + +// Reason values carried in monitor lifecycle event payloads. +const ( + ReasonChromeRestarted = "chrome_restarted" + ReasonReconnectExhausted = "reconnect_exhausted" +) + +// targetInfo holds metadata about an attached CDP target/session. +type targetInfo struct { + targetID string + url string + targetType string +} + +// cdpError is the JSON-RPC error object returned by Chrome. +type cdpError struct { + Code int `json:"code"` + Message string `json:"message"` +} + +func (e *cdpError) Error() string { + return fmt.Sprintf("CDP error %d: %s", e.Code, e.Message) +} + +// cdpMessage is the JSON-RPC message envelope used by Chrome's DevTools Protocol. +// ID is a pointer so we can distinguish an absent id (event) from id=0 (which +// Chrome never sends, but using a pointer is more correct than relying on that). +type cdpMessage struct { + ID *int64 `json:"id,omitempty"` + Method string `json:"method,omitempty"` + Params json.RawMessage `json:"params,omitempty"` + SessionID string `json:"sessionId,omitempty"` + Result json.RawMessage `json:"result,omitempty"` + Error *cdpError `json:"error,omitempty"` +} + +// networkReqState holds request + response metadata until loadingFinished. +type networkReqState struct { + sessionID string + method string + url string + headers json.RawMessage + postData string + resourceType string + loaderID string + frameID string + status int + statusText string + resHeaders json.RawMessage + mimeType string + addedAt time.Time // for TTL eviction +} + +// navContext carries the identity of the navigation that owns a computedState. +// Stamped at Page.frameNavigated and precomputed into event payloads/metadata. +type navContext struct { + sessionID string + targetID string + targetType string + frameID string + loaderID string + url string +} diff --git a/server/lib/cdpmonitor/util.go b/server/lib/cdpmonitor/util.go new file mode 100644 index 00000000..2c5b6d34 --- /dev/null +++ b/server/lib/cdpmonitor/util.go @@ -0,0 +1,123 @@ +package cdpmonitor + +import ( + "encoding/json" + "strings" + "unicode/utf8" +) + +// consoleArgString extracts a display string from a CDP console argument. +// For strings it unquotes the JSON value; for other types it returns the raw JSON. +func consoleArgString(a cdpRuntimeRemoteObject) string { + if len(a.Value) == 0 { + return a.Type // e.g. "undefined", "null" + } + if a.Type == "string" { + var s string + if json.Unmarshal(a.Value, &s) == nil { + return s + } + } + return string(a.Value) +} + +// isTextualResource reports whether the resource warrants body capture. +// resourceType is checked first; mimeType is a fallback for resources with no type (e.g. in-flight at attach time). +func isTextualResource(resourceType, mimeType string) bool { + switch resourceType { + case "Font", "Image", "Media", "Stylesheet", "Script": + return false + } + return isCapturedMIME(mimeType) +} + +// isCapturedMIME returns true for MIME types whose bodies are worth capturing. +// Uses an allow-list of known textual types; everything else is excluded. +func isCapturedMIME(mime string) bool { + if mime == "" { + return false + } + // Allow plain text subtypes. + if strings.HasPrefix(mime, "text/plain") || + strings.HasPrefix(mime, "text/html") || + strings.HasPrefix(mime, "text/xml") || + strings.HasPrefix(mime, "text/csv") { + return true + } + // Allow structured application types. + if strings.HasPrefix(mime, "application/json") || + strings.HasPrefix(mime, "application/xml") || + strings.HasPrefix(mime, "application/x-www-form-urlencoded") || + strings.HasPrefix(mime, "application/graphql") { + return true + } + // Allow vendor types with text-based suffixes. + if sub, ok := strings.CutPrefix(mime, "application/vnd."); ok { + for _, textSuffix := range []string{"+json", "+xml", "+csv"} { + if strings.HasSuffix(sub, textSuffix) { + return true + } + } + } + return false +} + +// structuredPrefixes lists MIME type prefixes that warrant full (8 KB) body capture. +var structuredPrefixes = []string{ + "application/json", + "application/xml", + "application/x-www-form-urlencoded", + "application/graphql", + "text/xml", + "text/csv", +} + +// bodyCapFor returns the max body capture size for a MIME type. +// Structured data (JSON, XML, CSV, form data) gets 8 KB; everything else gets 4 KB. +// Vendor types with +json, +xml, or +csv suffixes are also treated as structured, +// matching the allow-list in isCapturedMIME. +func bodyCapFor(mime string) int { + const fullCap = 8 * 1024 + const contextCap = 4 * 1024 + for _, p := range structuredPrefixes { + if strings.HasPrefix(mime, p) { + return fullCap + } + } + // vnd types with +json/+xml/+csv suffix are treated as structured. + for _, suffix := range []string{"+json", "+xml", "+csv"} { + if strings.HasSuffix(mime, suffix) { + return fullCap + } + } + return contextCap +} + +const truncatedSuffix = "...[truncated]" + +// truncateBody caps body at the given limit on a valid UTF-8 boundary. +// The result never splits a multi-byte rune. A truncation suffix is appended +// when the body is cut so callers can distinguish truncated from full content. +func truncateBody(body string, maxBody int) string { + if len(body) <= maxBody { + return body + } + if maxBody <= 0 { + return "" + } + // Reserve room for the truncation suffix within the limit. + cutAt := maxBody - len(truncatedSuffix) + if cutAt <= 0 { + return truncatedSuffix[:maxBody] + } + // Walk forward through complete runes, stopping before we exceed cutAt. + end := 0 + for end < cutAt { + _, size := utf8.DecodeRuneInString(body[end:]) + if end+size > cutAt { + break + } + end += size + } + return body[:end] + truncatedSuffix +}