Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions services/cloud-agent-next/wrapper/src/restore-session.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,30 @@ function makeSnapshot(diffs: Array<{ file: string; after: string; status: string
});
}

function makeSessionDiffSnapshot(patch: string): string {
return JSON.stringify({
info: snapshotInfo(),
sessionDiff: [
{
file: 'src/index.ts',
patch,
additions: 1,
deletions: 0,
status: 'modified',
},
],
messages: [
{
info: {
summary: {
diffs: [{ file: 'legacy.txt', after: 'legacy content', status: 'modified' }],
},
},
},
],
});
}

function makeMultiMessageSnapshot(
...messageDiffs: Array<Array<{ file: string; after: string; status: string }>>
): string {
Expand Down Expand Up @@ -403,6 +427,68 @@ describe('restoreSession', () => {
expect(fs.existsSync(path.join(workspace, 'old-file.txt'))).toBe(false);
});

it('prefers top-level patch session diffs over legacy message summaries', async () => {
const repo = path.join(tmpDir, 'repo');
fs.mkdirSync(path.join(repo, 'src'), { recursive: true });
Bun.spawnSync(['git', 'init'], { cwd: repo, stdout: 'pipe', stderr: 'pipe' });
Bun.spawnSync(['git', 'config', 'user.email', 'test@example.com'], {
cwd: repo,
stdout: 'pipe',
stderr: 'pipe',
});
Bun.spawnSync(['git', 'config', 'user.name', 'Test User'], {
cwd: repo,
stdout: 'pipe',
stderr: 'pipe',
});
fs.writeFileSync(path.join(repo, 'src/index.ts'), 'before\n');
Bun.spawnSync(['git', 'add', '.'], { cwd: repo, stdout: 'pipe', stderr: 'pipe' });
Bun.spawnSync(['git', 'commit', '-m', 'initial'], {
cwd: repo,
stdout: 'pipe',
stderr: 'pipe',
});
fs.writeFileSync(path.join(repo, 'src/index.ts'), 'after\n');
const proc = Bun.spawnSync(['git', 'diff', '--src-prefix=a/', '--dst-prefix=b/'], {
cwd: repo,
stdout: 'pipe',
stderr: 'pipe',
});
const patch = new TextDecoder().decode(proc.stdout);

fs.mkdirSync(path.join(workspace, 'src'), { recursive: true });
Bun.spawnSync(['git', 'init'], { cwd: workspace, stdout: 'pipe', stderr: 'pipe' });
Bun.spawnSync(['git', 'config', 'user.email', 'test@example.com'], {
cwd: workspace,
stdout: 'pipe',
stderr: 'pipe',
});
Bun.spawnSync(['git', 'config', 'user.name', 'Test User'], {
cwd: workspace,
stdout: 'pipe',
stderr: 'pipe',
});
fs.writeFileSync(path.join(workspace, 'src/index.ts'), 'before\n');
Bun.spawnSync(['git', 'add', '.'], { cwd: workspace, stdout: 'pipe', stderr: 'pipe' });
Bun.spawnSync(['git', 'commit', '-m', 'initial'], {
cwd: workspace,
stdout: 'pipe',
stderr: 'pipe',
});
mockFetchOk(makeSessionDiffSnapshot(patch));

const result = await restoreSession(SESSION_ID, workspace);

expect(result).toEqual({
ok: true,
downloaded: true,
imported: true,
diffs: { applied: 1, skipped: 0, total: 1 },
});
expect(fs.readFileSync(path.join(workspace, 'src/index.ts'), 'utf-8')).toBe('after\n');
expect(fs.existsSync(path.join(workspace, 'legacy.txt'))).toBe(false);
});

it('succeeds with zero diffs when messages array is empty', async () => {
mockFetchOk(JSON.stringify({ info: snapshotInfo(), messages: [] }));

Expand Down
42 changes: 40 additions & 2 deletions services/cloud-agent-next/wrapper/src/restore-session.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import fs from 'node:fs';
import os from 'node:os';
import path from 'node:path';
import { logToFile, runProcess } from './utils.js';

Expand All @@ -17,7 +18,8 @@ export type RestoreResult =

type SnapshotDiff = {
file: string;
after: string;
after?: string;
patch?: string;
status: string;
};

Expand Down Expand Up @@ -345,7 +347,7 @@ async function validateSnapshotInfoId(snapshotPath: string): Promise<SnapshotInf
// ~half the memory of a V8 heap.
// `objects` filters out non-object .summary values (e.g. compaction messages set summary=true)
const JQ_EXTRACT_DIFFS_FILTER =
'reduce (.messages[]?.info.summary | objects | .diffs[]? // empty) as $d ({}; .[$d.file] = $d) | [.[]]';
'reduce (if ((.sessionDiff? // []) | length) > 0 then .sessionDiff[] else (.messages[]?.info.summary | objects | .diffs[]? // empty) end) as $d ({}; if (($d.file? | type) == "string") then .[$d.file] = $d else . end) | [.[]]';

/**
* Extract last-write-wins diffs from a snapshot file. Prefers a jq subprocess
Expand Down Expand Up @@ -387,6 +389,7 @@ export async function extractDiffs(snapshotPath: string): Promise<SnapshotDiff[]
*/
async function extractDiffsWithBun(snapshotPath: string): Promise<SnapshotDiff[] | null> {
type SnapshotShape = {
sessionDiff?: SnapshotDiff[];
messages?: Array<{
info?: {
summary?: { diffs?: SnapshotDiff[] };
Expand All @@ -401,6 +404,12 @@ async function extractDiffsWithBun(snapshotPath: string): Promise<SnapshotDiff[]
return null;
}
const dedup = new Map<string, SnapshotDiff>();
if (Array.isArray(parsed.sessionDiff) && parsed.sessionDiff.length > 0) {
for (const diff of parsed.sessionDiff) {
if (diff && typeof diff.file === 'string') dedup.set(diff.file, diff);
}
return Array.from(dedup.values());
}
for (const message of parsed.messages ?? []) {
const summary = message?.info?.summary;
if (!summary || typeof summary !== 'object') continue;
Expand All @@ -411,6 +420,26 @@ async function extractDiffsWithBun(snapshotPath: string): Promise<SnapshotDiff[]
return Array.from(dedup.values());
}

function applyPatch(workspacePath: string, diff: SnapshotDiff): boolean {
if (!diff.patch) return false;
const dir = fs.mkdtempSync(path.join(os.tmpdir(), 'kilo-session-diff-'));
const file = path.join(dir, 'change.patch');
try {
fs.writeFileSync(file, diff.patch);
const proc = Bun.spawnSync(['git', 'apply', '--3way', '--whitespace=nowarn', file], {
cwd: workspacePath,
stdout: 'pipe',
stderr: 'pipe',
});
if (proc.exitCode === 0) return true;
const stderr = new TextDecoder().decode(proc.stderr).trim();
log(`git apply failed file=${diff.file} stderr=${stderr}`);
return false;
} finally {
fs.rmSync(dir, { recursive: true, force: true });
}
}

// ---------------------------------------------------------------------------
// Main logic
// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -565,6 +594,15 @@ export async function restoreSession(
let skipped = 0;

for (const diff of uniqueDiffs) {
if (diff.patch) {
if (applyPatch(workspacePath, diff)) {
applied++;
} else {
skipped++;
}
continue;
}

const fp = path.resolve(resolvedWorkspace, diff.file);

if (!fp.startsWith(resolvedWorkspace + '/')) {
Expand Down
25 changes: 21 additions & 4 deletions services/session-ingest/src/dos/SessionIngestDO.ts
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,23 @@ export class SessionIngestDO extends DurableObject<Env> {
}
}

controller.enqueue(encoder.encode(']}'));
controller.enqueue(encoder.encode(']'));
controller.enqueue(encoder.encode(',"sessionDiff":'));
const diffRow = db
.select({
item_data: ingestItems.item_data,
item_data_r2_key: ingestItems.item_data_r2_key,
})
.from(ingestItems)
.where(eq(ingestItems.item_type, 'session_diff'))
.limit(1)
.get();
if (diffRow) {
await enqueueItemData(controller, diffRow, r2, encoder, '[]');
} else {
controller.enqueue(encoder.encode('[]'));
}
controller.enqueue(encoder.encode('}'));
controller.close();
} catch (err) {
controller.error(err);
Expand Down Expand Up @@ -588,7 +604,8 @@ async function enqueueItemData(
controller: ReadableStreamDefaultController<Uint8Array>,
ref: ItemDataRef,
r2: R2Bucket,
encoder: TextEncoder
encoder: TextEncoder,
missingFallback = '{}'
): Promise<void> {
if (ref.item_data_r2_key) {
const obj = await r2.get(ref.item_data_r2_key);
Expand All @@ -600,10 +617,10 @@ async function enqueueItemData(
controller.enqueue(result.value);
}
} else {
console.error('R2 blob missing during export, falling back to empty object', {
console.error('R2 blob missing during export, using fallback item data', {
r2Key: ref.item_data_r2_key,
});
controller.enqueue(encoder.encode('{}'));
controller.enqueue(encoder.encode(missingFallback));
}
} else {
controller.enqueue(encoder.encode(ref.item_data));
Expand Down
91 changes: 91 additions & 0 deletions services/session-ingest/test/integration/session-ingest-do.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,62 @@ describe('SessionIngestDO integration', () => {

expect(snapshot.info).toEqual({ title: 'Test Session' });
expect(snapshot.messages).toEqual([]);
expect(snapshot.sessionDiff).toEqual([]);
});

it('exports the latest session diff as a top-level field', async () => {
const sessionId = 'ses_roundtrip_diff_000000004';
const stub = getStub(kiloUserId, sessionId);
const diffs = [
{
file: 'src/index.ts',
patch: 'diff --git a/src/index.ts b/src/index.ts\n',
additions: 1,
deletions: 0,
status: 'modified',
},
];

await stub.ingest(
[
{ type: 'session', data: { title: 'Session Diff Export' } },
{ type: 'session_diff', data: diffs },
],
kiloUserId,
sessionId,
1
);

const raw = await stub.getAllStream().then(s => new Response(s).text());
const snapshot = JSON.parse(raw);

expect(snapshot.info).toEqual({ title: 'Session Diff Export' });
expect(snapshot.messages).toEqual([]);
expect(snapshot.sessionDiff).toEqual(diffs);
});

it('exports an empty array for a missing R2-backed session diff', async () => {
const sessionId = 'ses_roundtrip_diff_missing_r2_001';
const stub = getStub(kiloUserId, sessionId);

await stub.ingest(
[
{ type: 'session', data: { title: 'Missing R2 Diff Export' } },
{ type: 'session_diff', data: [{ file: 'missing.txt', additions: 1, deletions: 0 }] },
],
kiloUserId,
sessionId,
1,
1000,
{ session_diff: `items/${sessionId}/session_diff/missing` }
);

const raw = await stub.getAllStream().then(s => new Response(s).text());
const snapshot = JSON.parse(raw);

expect(snapshot.info).toEqual({ title: 'Missing R2 Diff Export' });
expect(snapshot.messages).toEqual([]);
expect(snapshot.sessionDiff).toEqual([]);
});

it('ingests multiple items and exports a full snapshot', async () => {
Expand Down Expand Up @@ -2182,6 +2238,41 @@ describe('SessionIngestDO integration', () => {

expect(snapshot.info).toEqual({ title: 'Big Session Info' });
});

it('exports R2-backed session diffs with resolved data', async () => {
const sessionId = 'ses_r2_backed_diff_0000016';
const stub = getStub(kiloUserId, sessionId);
const diffs = [
{
file: 'large.txt',
patch: 'diff --git a/large.txt b/large.txt\n',
additions: 1,
deletions: 0,
status: 'modified',
},
];
const itemData = JSON.stringify(diffs);
const r2Key = `items/${kiloUserId}/${sessionId}/session_diff/3000`;
await env.SESSION_INGEST_R2.put(r2Key, itemData);

await stub.ingest(
[
{ type: 'session', data: { title: 'R2 Diff Test' } },
{ type: 'session_diff', data: diffs },
],
kiloUserId,
sessionId,
1,
3000,
{ session_diff: r2Key }
);

const raw = await stub.getAllStream().then(s => new Response(s).text());
const snapshot = JSON.parse(raw);

expect(snapshot.info).toEqual({ title: 'R2 Diff Test' });
expect(snapshot.sessionDiff).toEqual(diffs);
});
});

describe('timestamp guard', () => {
Expand Down