Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 215 additions & 0 deletions plans/s2-storage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
# Browser Events Durable Storage Plan

---

## Overview

Browser Events captured by the image server (CDP, live view, computer control, captcha) are already written to per-category JSONL files and streamed over SSE. This plan adds a third sink: a cloud append-only log store.

---

## System Context
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this flow makes sense overall. I think it'd be helpful to have more clarity overall on:

  1. how enabling the s2 delivery ties into the existing APIs
  2. credentials for s2 within the vm


```
[CDP Monitor] ──┐
[Computer API] ──┤─► CaptureSession.Publish ──► RingBuffer (fan-out)
[Extension] ──┤ │
[Live View] ──┘ ┌───────────────┼──────────────┐
│ │ │
FileWriter SSE handler EventsStorageWriter ◄─ (new)
(local) (real-time) (durable)
```

All three sinks consume from the same ring buffer. The ring buffer is non-blocking: writers never wait for any sink. Each sink holds an independent `Reader` cursor.

---

## Key Components

### `EventsStorageWriter` (`eventsstorage.go`)

The single goroutine that moves events from the ring to the configured backend. Single-use: `Run(ctx)` blocks until ctx is cancelled, then returns. `Close()` drains in-flight writes and tears down the backend.

### `EventsStorage` (interface in `eventsstorage.go`)

```go
type EventsStorage interface {
// Append writes data to the named stream. The S2 backend relies on
// the basin's create-stream-on-append feature.
Append(ctx context.Context, streamName string, data []byte) error
// Close flushes pending writes and releases resources.
Close() error
}
```

The interface boundary between `EventsStorageWriter` and any specific backend. The mock implementation used in tests lives exclusively in `eventsstorage_writer_test.go`.

### S2 Storage (`s2storage.go`)

The production `EventsStorage` backed by S2. Lazily creates one S2 producer per capture session ID. The producer map is mutex-protected; `Append` is called serially from `EventsStorageWriter.Run`, but ack goroutines run concurrently.

**Producer lifecycle:** Producers are evicted when their capture session ends via `Remove(streamName string)`, called from the `POST /events/stop` handler. This prevents unbounded accumulation of producers across session cycles on long-running servers.

### `s2Producer`

Bundles one `s2.Producer` with a `sync.WaitGroup` that tracks in-flight ack goroutines. `Close()` calls `wg.Wait()` before closing the producer, ensuring no ack is orphaned.

---

## File Structure

```
server/lib/events/
eventsstorage.go # EventsStorage interface + EventsStorageWriter
eventsstorage_writer_test.go # Tests via mockBackend — no S2 dependency
s2storage.go # S2 implementation of EventsStorage
```

---

## Architectural Decisions

### 1. Stream name = capture session ID

Each capture session maps to a dedicated stream named by the session UUID. Streams are created automatically on first write (S2 does this via create-stream-on-append basin feature). This means:

- Replaying a session = reading one stream from seq 0
- Concurrent sessions write to separate streams with no coordination
Comment on lines +72 to +77
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we persist capture session anywhere? mainly trying to understand how we'll do "reads" (e.g. after a browser session is destroyed or something)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good point, I still have to make a pass to the kernel api to add the other endpoints. In that I will add db to add field.String("s2_stream") to capture this


### 2. Lazy producer creation with session-end eviction (S2 backend)

Producers are created on first `Append` for a given stream name and cached until the session ends. The `s2storage` exposes a `Remove(streamName)` method that drains and closes the producer for that stream. `POST /events/stop` calls `Remove` after the session is torn down. Preventing the producer map from growing unbounded on long-running servers that cycle through many capture sessions.

### 3. Batching: 100ms linger / 50 records (S2 backend)

The S2 SDK batcher coalesces records before flushing to the network. Both values are configurable via environment variables with the defaults below:

| Env var | Default | Description |
| --- | --- | --- |
| `S2_BATCHER_LINGER_MS` | `100` | Flush delay in milliseconds |
| `S2_BATCHER_MAX_RECORDS` | `50` | Max records per batch |

These are independent of the ring buffer read loop — the writer appends one record per ring Read, and the batcher decides when to flush.
Comment on lines +83 to +92
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may want to have these be env vars / configurable so we can externally control


### 4. Feedback loop prevention

`EventsStorageWriter.Run` skips envelopes whose `Event.Type == EventsStorageError`. Without this, a error would re-enter the ring and be read by EventsStorageWriter causing churn. The constant `EventsStorageError` is defined in `event.go` and used in both the writer and the error-emit path to prevent typo-driven breakage.

### 5. 1MB record size limit and truncation

The S2 1MB per-record limit is enforced at `CaptureSession.Publish`, not at the EventsStorageWriter. `truncateIfNeeded` nulls `event.data` and sets `event.truncated=true` when the marshalled envelope exceeds `maxRecordBytes`. This ensures truncation applies equally to file logs and durable records.

### 6. Shutdown sequencing

Shutdown must be strictly ordered to avoid writing to a closed `FileWriter`:

```
1. ctx cancelled (SIGINT/SIGTERM)
2. EventsStorageWriter.Run returns (reader unblocks from cancelled ctx)
3. storageDone channel closes
4. storageWriter.Close() — drains in-flight S2 writes
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's also helpful to walk through the actual browser teardown pathway and confirm we have the right interface in this API server to at best effort flush capture stream to s2. I'd expect a number of our users would do something like a try -> {start capture session, run automation} -> catch (log error) -> finally {delete browser}. So ensuring we can get the data out before the vm is gone is valuable here ^^

5. apiService.Shutdown() — closes CaptureSession (and its FileWriter)
```

---

## API Surface Changes

The storage writer runs as a background goroutine and is **not part of the OpenAPI surface**. Only two existing endpoints change behaviour; all others are unaffected.

| Endpoint | Service | Change |
| --- | --- | --- |
| `POST /events/stop` | image server | After stopping, call `s2storage.Remove(captureSessionID)` to evict the producer. No request/response change. Requires a new `CaptureSession.CurrentID() string` accessor. |
| `POST /events/capture_session` | kernel server | After the image server returns a capture session ID, the kernel server writes that ID to `sessions.s2_stream`. No request/response change to the image server endpoint itself. |

---

## Wiring into `main.go`

Three additions to `main.go`:

**1. Conditional construction**: S2 is enabled when `S2_BASIN` and `S2_TOKEN` are both present. If either is empty the storage writer is not started and the server behaves exactly as before.

```
config.S2Basin != "" && config.S2Token != "" → build s2storage + EventsStorageWriter
otherwise → no-op (nil storageWriter)
```

**2. Goroutine launch**: immediately after the HTTP servers are started:

```go
storageDone := make(chan struct{})
go func() {
defer close(storageDone)
storageWriter.Run(ctx) // blocks until ctx cancelled
}()
```

**3. Shutdown ordering**: the existing `errgroup` in main already waits for `apiService.Shutdown`. The storage writer must drain before that:

```
ctx cancelled
→ storageWriter.Run returns (ring reader unblocks)
→ storageDone closes
→ storageWriter.Close() (drains in-flight S2 acks)
→ apiService.Shutdown() (closes CaptureSession + FileWriter)
```

This is implemented by waiting on `storageDone` before calling `apiService.Shutdown`, outside the errgroup.

---

## Credentials and Configuration

**In the image server (**`config.go`**):**

```go
S2Basin string `envconfig:"S2_BASIN" default:""`
S2Token string `envconfig:"S2_TOKEN" default:""`
S2BatcherLingerMs int `envconfig:"S2_BATCHER_LINGER_MS" default:"100"`
S2BatcherMaxRecs int `envconfig:"S2_BATCHER_MAX_RECORDS" default:"50"`
```

**In the VM (infra):**

| Vault variable | Purpose |
| --- | --- |
| `vault_s2_events_basin_production` | S2 basin for browser event capture (prod) |
| `vault_s2_events_basin_staging` | S2 basin for browser event capture (staging) |
| `vault_s2_events_token_production` | Access token for the above basin (prod) |
| `vault_s2_events_token_staging` | Access token for the above basin (staging) |

---

## Session Discoverability

### New column: `s2_stream` on `Sessions`

The capture session ID (= S2 stream name) must survive after the image server container is destroyed so that callers can replay events later. The kernel server stores it on the `Sessions` row at capture session start.

**Schema change** (mirrors `replay_prefix` in `kernel/packages/api/ent/schema/session.go`):

```go
field.String("s2_stream").Optional()
```

**Write path** — after `POST /events/capture_session` succeeds, the kernel server sets `s2_stream` to the returned capture session ID on the active `Sessions` row.

**Read path** — to replay a session, query `sessions.s2_stream` for the session ID, then read the S2 stream by that name from seq 0.

---

## Testing

`EventsStorageWriter` is tested exclusively through `mockBackend` (defined in `eventsstorage_writer_test.go`). Test cases cover:

| Scenario | What is verified |
| --- | --- |
| Normal append | Records routed to correct stream, deserialise back to `Envelope` |
| Ring buffer overflow (dropped) | Writer logs warning and skips; no crash |
| `Append` error | `publishFn` receives exactly one `system_durable_error` event |
| Context cancelled | `Run` returns `nil` (clean shutdown) |
| `EventsStorageError` skipped | Error events not re-submitted, preventing feedback loops |
| Marshal failure (oversized) | Writer skips and continues; next event is processed normally |

---
Loading