diff --git a/services/cloud-agent-next/wrapper/src/restore-session.test.ts b/services/cloud-agent-next/wrapper/src/restore-session.test.ts index b20f972dfa..a7ce55efed 100644 --- a/services/cloud-agent-next/wrapper/src/restore-session.test.ts +++ b/services/cloud-agent-next/wrapper/src/restore-session.test.ts @@ -22,6 +22,30 @@ function makeSnapshot(diffs: Array<{ file: string; after: string; status: string }); } +function makeSessionDiffSnapshot(patch: string): string { + return JSON.stringify({ + info: snapshotInfo(), + sessionDiff: [ + { + file: 'src/index.ts', + patch, + additions: 1, + deletions: 0, + status: 'modified', + }, + ], + messages: [ + { + info: { + summary: { + diffs: [{ file: 'legacy.txt', after: 'legacy content', status: 'modified' }], + }, + }, + }, + ], + }); +} + function makeMultiMessageSnapshot( ...messageDiffs: Array> ): string { @@ -403,6 +427,68 @@ describe('restoreSession', () => { expect(fs.existsSync(path.join(workspace, 'old-file.txt'))).toBe(false); }); + it('prefers top-level patch session diffs over legacy message summaries', async () => { + const repo = path.join(tmpDir, 'repo'); + fs.mkdirSync(path.join(repo, 'src'), { recursive: true }); + Bun.spawnSync(['git', 'init'], { cwd: repo, stdout: 'pipe', stderr: 'pipe' }); + Bun.spawnSync(['git', 'config', 'user.email', 'test@example.com'], { + cwd: repo, + stdout: 'pipe', + stderr: 'pipe', + }); + Bun.spawnSync(['git', 'config', 'user.name', 'Test User'], { + cwd: repo, + stdout: 'pipe', + stderr: 'pipe', + }); + fs.writeFileSync(path.join(repo, 'src/index.ts'), 'before\n'); + Bun.spawnSync(['git', 'add', '.'], { cwd: repo, stdout: 'pipe', stderr: 'pipe' }); + Bun.spawnSync(['git', 'commit', '-m', 'initial'], { + cwd: repo, + stdout: 'pipe', + stderr: 'pipe', + }); + fs.writeFileSync(path.join(repo, 'src/index.ts'), 'after\n'); + const proc = Bun.spawnSync(['git', 'diff', '--src-prefix=a/', '--dst-prefix=b/'], { + cwd: repo, + stdout: 'pipe', + stderr: 'pipe', + }); + const patch = new TextDecoder().decode(proc.stdout); + + fs.mkdirSync(path.join(workspace, 'src'), { recursive: true }); + Bun.spawnSync(['git', 'init'], { cwd: workspace, stdout: 'pipe', stderr: 'pipe' }); + Bun.spawnSync(['git', 'config', 'user.email', 'test@example.com'], { + cwd: workspace, + stdout: 'pipe', + stderr: 'pipe', + }); + Bun.spawnSync(['git', 'config', 'user.name', 'Test User'], { + cwd: workspace, + stdout: 'pipe', + stderr: 'pipe', + }); + fs.writeFileSync(path.join(workspace, 'src/index.ts'), 'before\n'); + Bun.spawnSync(['git', 'add', '.'], { cwd: workspace, stdout: 'pipe', stderr: 'pipe' }); + Bun.spawnSync(['git', 'commit', '-m', 'initial'], { + cwd: workspace, + stdout: 'pipe', + stderr: 'pipe', + }); + mockFetchOk(makeSessionDiffSnapshot(patch)); + + const result = await restoreSession(SESSION_ID, workspace); + + expect(result).toEqual({ + ok: true, + downloaded: true, + imported: true, + diffs: { applied: 1, skipped: 0, total: 1 }, + }); + expect(fs.readFileSync(path.join(workspace, 'src/index.ts'), 'utf-8')).toBe('after\n'); + expect(fs.existsSync(path.join(workspace, 'legacy.txt'))).toBe(false); + }); + it('succeeds with zero diffs when messages array is empty', async () => { mockFetchOk(JSON.stringify({ info: snapshotInfo(), messages: [] })); diff --git a/services/cloud-agent-next/wrapper/src/restore-session.ts b/services/cloud-agent-next/wrapper/src/restore-session.ts index e98bf11d12..e48fc40477 100644 --- a/services/cloud-agent-next/wrapper/src/restore-session.ts +++ b/services/cloud-agent-next/wrapper/src/restore-session.ts @@ -1,4 +1,5 @@ import fs from 'node:fs'; +import os from 'node:os'; import path from 'node:path'; import { logToFile, runProcess } from './utils.js'; @@ -17,7 +18,8 @@ export type RestoreResult = type SnapshotDiff = { file: string; - after: string; + after?: string; + patch?: string; status: string; }; @@ -345,7 +347,7 @@ async function validateSnapshotInfoId(snapshotPath: string): Promise 0 then .sessionDiff[] else (.messages[]?.info.summary | objects | .diffs[]? // empty) end) as $d ({}; if (($d.file? | type) == "string") then .[$d.file] = $d else . end) | [.[]]'; /** * Extract last-write-wins diffs from a snapshot file. Prefers a jq subprocess @@ -387,6 +389,7 @@ export async function extractDiffs(snapshotPath: string): Promise { type SnapshotShape = { + sessionDiff?: SnapshotDiff[]; messages?: Array<{ info?: { summary?: { diffs?: SnapshotDiff[] }; @@ -401,6 +404,12 @@ async function extractDiffsWithBun(snapshotPath: string): Promise(); + if (Array.isArray(parsed.sessionDiff) && parsed.sessionDiff.length > 0) { + for (const diff of parsed.sessionDiff) { + if (diff && typeof diff.file === 'string') dedup.set(diff.file, diff); + } + return Array.from(dedup.values()); + } for (const message of parsed.messages ?? []) { const summary = message?.info?.summary; if (!summary || typeof summary !== 'object') continue; @@ -411,6 +420,26 @@ async function extractDiffsWithBun(snapshotPath: string): Promise { } } - controller.enqueue(encoder.encode(']}')); + controller.enqueue(encoder.encode(']')); + controller.enqueue(encoder.encode(',"sessionDiff":')); + const diffRow = db + .select({ + item_data: ingestItems.item_data, + item_data_r2_key: ingestItems.item_data_r2_key, + }) + .from(ingestItems) + .where(eq(ingestItems.item_type, 'session_diff')) + .limit(1) + .get(); + if (diffRow) { + await enqueueItemData(controller, diffRow, r2, encoder, '[]'); + } else { + controller.enqueue(encoder.encode('[]')); + } + controller.enqueue(encoder.encode('}')); controller.close(); } catch (err) { controller.error(err); @@ -588,7 +604,8 @@ async function enqueueItemData( controller: ReadableStreamDefaultController, ref: ItemDataRef, r2: R2Bucket, - encoder: TextEncoder + encoder: TextEncoder, + missingFallback = '{}' ): Promise { if (ref.item_data_r2_key) { const obj = await r2.get(ref.item_data_r2_key); @@ -600,10 +617,10 @@ async function enqueueItemData( controller.enqueue(result.value); } } else { - console.error('R2 blob missing during export, falling back to empty object', { + console.error('R2 blob missing during export, using fallback item data', { r2Key: ref.item_data_r2_key, }); - controller.enqueue(encoder.encode('{}')); + controller.enqueue(encoder.encode(missingFallback)); } } else { controller.enqueue(encoder.encode(ref.item_data)); diff --git a/services/session-ingest/test/integration/session-ingest-do.test.ts b/services/session-ingest/test/integration/session-ingest-do.test.ts index 0ddda7e16b..d770813243 100644 --- a/services/session-ingest/test/integration/session-ingest-do.test.ts +++ b/services/session-ingest/test/integration/session-ingest-do.test.ts @@ -37,6 +37,62 @@ describe('SessionIngestDO integration', () => { expect(snapshot.info).toEqual({ title: 'Test Session' }); expect(snapshot.messages).toEqual([]); + expect(snapshot.sessionDiff).toEqual([]); + }); + + it('exports the latest session diff as a top-level field', async () => { + const sessionId = 'ses_roundtrip_diff_000000004'; + const stub = getStub(kiloUserId, sessionId); + const diffs = [ + { + file: 'src/index.ts', + patch: 'diff --git a/src/index.ts b/src/index.ts\n', + additions: 1, + deletions: 0, + status: 'modified', + }, + ]; + + await stub.ingest( + [ + { type: 'session', data: { title: 'Session Diff Export' } }, + { type: 'session_diff', data: diffs }, + ], + kiloUserId, + sessionId, + 1 + ); + + const raw = await stub.getAllStream().then(s => new Response(s).text()); + const snapshot = JSON.parse(raw); + + expect(snapshot.info).toEqual({ title: 'Session Diff Export' }); + expect(snapshot.messages).toEqual([]); + expect(snapshot.sessionDiff).toEqual(diffs); + }); + + it('exports an empty array for a missing R2-backed session diff', async () => { + const sessionId = 'ses_roundtrip_diff_missing_r2_001'; + const stub = getStub(kiloUserId, sessionId); + + await stub.ingest( + [ + { type: 'session', data: { title: 'Missing R2 Diff Export' } }, + { type: 'session_diff', data: [{ file: 'missing.txt', additions: 1, deletions: 0 }] }, + ], + kiloUserId, + sessionId, + 1, + 1000, + { session_diff: `items/${sessionId}/session_diff/missing` } + ); + + const raw = await stub.getAllStream().then(s => new Response(s).text()); + const snapshot = JSON.parse(raw); + + expect(snapshot.info).toEqual({ title: 'Missing R2 Diff Export' }); + expect(snapshot.messages).toEqual([]); + expect(snapshot.sessionDiff).toEqual([]); }); it('ingests multiple items and exports a full snapshot', async () => { @@ -2182,6 +2238,41 @@ describe('SessionIngestDO integration', () => { expect(snapshot.info).toEqual({ title: 'Big Session Info' }); }); + + it('exports R2-backed session diffs with resolved data', async () => { + const sessionId = 'ses_r2_backed_diff_0000016'; + const stub = getStub(kiloUserId, sessionId); + const diffs = [ + { + file: 'large.txt', + patch: 'diff --git a/large.txt b/large.txt\n', + additions: 1, + deletions: 0, + status: 'modified', + }, + ]; + const itemData = JSON.stringify(diffs); + const r2Key = `items/${kiloUserId}/${sessionId}/session_diff/3000`; + await env.SESSION_INGEST_R2.put(r2Key, itemData); + + await stub.ingest( + [ + { type: 'session', data: { title: 'R2 Diff Test' } }, + { type: 'session_diff', data: diffs }, + ], + kiloUserId, + sessionId, + 1, + 3000, + { session_diff: r2Key } + ); + + const raw = await stub.getAllStream().then(s => new Response(s).text()); + const snapshot = JSON.parse(raw); + + expect(snapshot.info).toEqual({ title: 'R2 Diff Test' }); + expect(snapshot.sessionDiff).toEqual(diffs); + }); }); describe('timestamp guard', () => {