Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion services/cloud-agent-next/src/execution/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ export type RetryableErrorCode =
| 'SANDBOX_CONNECT_FAILED' // Sandbox may be waking up or network issue
| 'WORKSPACE_SETUP_FAILED' // Git clone/network transient failure
| 'KILO_SERVER_FAILED' // Kilo server starting up
| 'WRAPPER_START_FAILED'; // Wrapper process starting
| 'WRAPPER_START_FAILED' // Wrapper process starting
| 'WRAPPER_FINALIZING'; // Wrapper sealed the current run before this delivery

/**
* Error codes for non-retryable failures (4xx/5xx).
Expand Down
25 changes: 25 additions & 0 deletions services/cloud-agent-next/src/execution/orchestrator.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { beforeEach, describe, expect, it, vi } from 'vitest';
import type { AgentSandbox, WrapperInstanceLease } from '../agent-sandbox/protocol.js';
import type { Env } from '../types.js';
import { WrapperError } from '../kilo/wrapper-client.js';
import type { ExecutionError } from './errors.js';
import type { FencedWrapperDispatchRequest } from './types.js';
import {
Expand Down Expand Up @@ -236,6 +237,30 @@ describe('ExecutionOrchestrator AgentSandbox delivery', () => {
} satisfies Partial<ExecutionError>);
});

it('preserves a finalizing error from wrapper startup', async () => {
const { orchestrator, ensureWrapper } = createOrchestrator();
const finalizingError = new WrapperError(
'Wrapper batch is finalizing',
'WRAPPER_FINALIZING',
409
);
ensureWrapper.mockRejectedValueOnce(finalizingError);

await expect(orchestrator.execute(basePlan)).rejects.toBe(finalizingError);
});

it('preserves a finalizing error from wrapper dispatch', async () => {
const { orchestrator, prompt } = createOrchestrator();
const finalizingError = new WrapperError(
'Wrapper batch is finalizing',
'WRAPPER_FINALIZING',
409
);
prompt.mockRejectedValueOnce(finalizingError);

await expect(orchestrator.execute(basePlan)).rejects.toBe(finalizingError);
});

it('does not recover the shared sandbox for plain capacity admission rejection', async () => {
const { orchestrator, ensureWrapper, deleteSandbox } = createOrchestrator();
ensureWrapper.mockRejectedValueOnce(
Expand Down
4 changes: 4 additions & 0 deletions services/cloud-agent-next/src/execution/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ export class ExecutionOrchestrator {
});
} catch (error) {
if (error instanceof ExecutionError) throw error;
if (error instanceof WrapperError && error.code === 'WRAPPER_FINALIZING') throw error;
throw ExecutionError.wrapperStartFailed(
`Failed to start wrapper: ${error instanceof Error ? error.message : String(error)}`,
error
Expand Down Expand Up @@ -210,6 +211,9 @@ export class ExecutionOrchestrator {
if (error.code === 'KILO_SERVER_FAILED') {
throw ExecutionError.kiloServerFailed(error.message, error);
}
if (error.code === 'WRAPPER_FINALIZING') {
throw error;
}
}
if (error instanceof ExecutionError) throw error;
throw ExecutionError.wrapperStartFailed(
Expand Down
3 changes: 2 additions & 1 deletion services/cloud-agent-next/src/execution/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ export type RetryableResultCode =
| 'SANDBOX_CONNECT_FAILED'
| 'WORKSPACE_SETUP_FAILED'
| 'KILO_SERVER_FAILED'
| 'WRAPPER_START_FAILED';
| 'WRAPPER_START_FAILED'
| 'WRAPPER_FINALIZING';

export type AdmissionFailure = {
success: false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,7 @@ function promptAdmissionError(
case 'WORKSPACE_SETUP_FAILED':
case 'KILO_SERVER_FAILED':
case 'WRAPPER_START_FAILED':
case 'WRAPPER_FINALIZING':
return facadeError(503, result.code, result.error);
case 'INTERNAL':
return facadeError(500, 'KILO_PROMPT_ADMISSION_FAILED', result.error);
Expand Down
49 changes: 49 additions & 0 deletions services/cloud-agent-next/src/kilo/wrapper-client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
WrapperClient,
WrapperContainerClient,
WrapperError,
WrapperFinalizingError,
WrapperNotReadyError,
WrapperNoJobError,
WrapperJobConflictError,
Expand Down Expand Up @@ -361,6 +362,14 @@ describe('WrapperClient', () => {
expect(result.lastError).toBeDefined();
expect(result.lastError?.code).toBe('INFLIGHT_TIMEOUT');
});

it('returns finalizing status', async () => {
const statusResponse: JobStatus = { state: 'finalizing', sessionId: 'kilo_456' };
const session = createMockSession(createSuccessResponse(statusResponse));
const client = new WrapperClient({ session, port: defaultPort });

await expect(client.status()).resolves.toEqual(statusResponse);
});
});

// -------------------------------------------------------------------------
Expand Down Expand Up @@ -396,6 +405,41 @@ describe('WrapperClient', () => {
expect(result.messageId).toBeUndefined();
});

it('parses finalizing responses with the wrapper run identity', async () => {
const session = createMockSession({
exitCode: 0,
stdout: JSON.stringify({
error: 'WRAPPER_FINALIZING',
message: 'Wrapper batch is finalizing',
wrapperRunId: 'wr_old',
}),
});
const client = new WrapperClient({ session, port: defaultPort });

await expect(client.prompt(createPromptOptions())).rejects.toEqual(
expect.objectContaining({
name: 'WrapperFinalizingError',
code: 'WRAPPER_FINALIZING',
wrapperRunId: 'wr_old',
})
);
});

it('parses legacy finalizing responses without a wrapper run identity', async () => {
const session = createMockSession(
createErrorResponse('WRAPPER_FINALIZING', 'Wrapper batch is finalizing')
);
const client = new WrapperClient({ session, port: defaultPort });

await expect(client.prompt(createPromptOptions())).rejects.toEqual(
expect.objectContaining({
name: 'WrapperFinalizingError',
code: 'WRAPPER_FINALIZING',
wrapperRunId: undefined,
})
);
});

it('sends prompt text', async () => {
const session = createMockSession(
createSuccessResponse({ status: 'sent', messageId: 'msg_1' })
Expand Down Expand Up @@ -2074,6 +2118,11 @@ describe('WrapperClient', () => {
// -------------------------------------------------------------------------

describe('error classes', () => {
it('WrapperFinalizingError carries optional wrapper run identity', () => {
expect(new WrapperFinalizingError('finalizing', 'wr_old').wrapperRunId).toBe('wr_old');
expect(new WrapperFinalizingError('legacy').wrapperRunId).toBeUndefined();
});

it('WrapperError has correct properties', () => {
const error = new WrapperError('Test message', 'TEST_CODE', 500);

Expand Down
20 changes: 19 additions & 1 deletion services/cloud-agent-next/src/kilo/wrapper-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ export type WrapperPty = {
};

export type JobStatus = {
state: 'idle' | 'active';
state: 'idle' | 'active' | 'finalizing';
sessionId?: string;
lastError?: {
code: string;
Expand Down Expand Up @@ -168,6 +168,16 @@ export class WrapperError extends Error {
}
}

export class WrapperFinalizingError extends WrapperError {
constructor(
message: string,
public readonly wrapperRunId?: string
) {
super(message, 'WRAPPER_FINALIZING', 409);
this.name = 'WrapperFinalizingError';
}
}

export class WrapperNotReadyError extends WrapperError {
constructor(message: string, options?: ErrorOptions) {
super(message, 'NOT_READY', 503, options);
Expand Down Expand Up @@ -198,6 +208,7 @@ const ERROR_STATUS_CODES: Record<string, number> = {
WORKSPACE_SETUP_FAILED: 503,
KILO_SERVER_FAILED: 503,
SEND_ERROR: 500,
WRAPPER_FINALIZING: 409,
};

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -469,6 +480,7 @@ export class WrapperClient {
error?: string;
message?: string;
retryable?: boolean;
wrapperRunId?: string;
};

// Check for error response
Expand All @@ -485,6 +497,12 @@ export class WrapperClient {
if (errorCode === 'JOB_CONFLICT') {
throw new WrapperJobConflictError(parsed.message ?? 'Job conflict');
}
if (errorCode === 'WRAPPER_FINALIZING') {
throw new WrapperFinalizingError(
parsed.message ?? 'Wrapper batch is finalizing',
parsed.wrapperRunId
);
}

throw new WrapperError(parsed.message ?? errorCode, errorCode, statusCode);
}
Expand Down
17 changes: 13 additions & 4 deletions services/cloud-agent-next/src/persistence/CloudAgentSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ import {
clearWrapperRuntimeIdentity,
getWrapperLease,
getWrapperRuntimeState,
isWrapperDeliveryHeld,
isWrapperRunFinalizing,
nextWrapperCleanupDeadline,
nextWrapperLeaseDeadline,
} from '../session/wrapper-runtime-state.js';
Expand Down Expand Up @@ -494,10 +496,7 @@ export class CloudAgentSession extends DurableObject<WorkerEnv> {
...event,
});
},
hasObservedWrapperIdle: async () => {
const state = await getWrapperRuntimeState(this.ctx.storage);
return state.lastWrapperIdleAt !== undefined;
},
hasObservedWrapperIdle: async () => false,
requestAlarmAtOrBefore: deadline => this.scheduleAlarmAtOrBefore(deadline),
getSessionIdForLogs: () => this.sessionId,
});
Expand Down Expand Up @@ -549,6 +548,8 @@ export class CloudAgentSession extends DurableObject<WorkerEnv> {
hasActiveIngestConnection: async params =>
(await this.getIngestHandler()).hasActiveConnection(params),
clearInterruptRequest: () => this.executionQueries.clearInterrupt(),
ensureAcceptedMessageBeforeTerminal: (messageId, wrapperRunId) =>
this.ensureAcceptedMessageBeforeTerminal(messageId, wrapperRunId),
stopWrappers: async request => {
if (this.physicalWrapperStopper) return this.physicalWrapperStopper(request);
if (this.orchestrator || (!this.env.Sandbox && !this.env.SandboxSmall)) {
Expand Down Expand Up @@ -604,6 +605,8 @@ export class CloudAgentSession extends DurableObject<WorkerEnv> {
return retryAt === undefined ? null : { retryAt };
},
deliver: plan => this.executeDirectly(plan),
isDeliveryHeld: async () =>
isWrapperRunFinalizing(await getWrapperRuntimeState(this.ctx.storage)),
ensureQueuedMessageEvent: event => {
this.ensureQueuedMessageEvent({
executionId: '' as EventSourceId,
Expand Down Expand Up @@ -2013,9 +2016,14 @@ export class CloudAgentSession extends DurableObject<WorkerEnv> {
nextAlarmAt = existingAlarm;
}

const pendingDeliveryHeld = isWrapperDeliveryHeld(
await getWrapperRuntimeState(this.ctx.storage),
await getWrapperLease(this.ctx.storage)
);
if (
pendingFlushRetryAt === undefined &&
pendingCount > 0 &&
!pendingDeliveryHeld &&
currentTime + PENDING_FLUSH_DEBOUNCE_MS < nextAlarmAt
) {
nextAlarmAt = currentTime + PENDING_FLUSH_DEBOUNCE_MS;
Expand Down Expand Up @@ -2918,6 +2926,7 @@ export class CloudAgentSession extends DurableObject<WorkerEnv> {
status: 'completed' | 'failed' | 'interrupted';
error?: string;
gateResult?: 'pass' | 'fail';
messageIds?: string[];
}): Promise<void> {
await this.resolveSessionId();
await this.getWrapperSupervisor().onTerminalEvent(params);
Expand Down
Loading