From 49e8d843b06d50981ca2265e520ab23bc14db218 Mon Sep 17 00:00:00 2001 From: Matthew O'Riordan Date: Fri, 13 Mar 2026 18:16:19 +0100 Subject: [PATCH 1/2] feat: add MessageMaterializer POC plugin Proof of concept for a convenience layer over message.append that automatically accumulates appended data, handles late-join via getMessage(), and provides toPartialJSON() for rendering incomplete JSON during AI/LLM token streaming. Includes vendored partial-json parser, integration tests (27 passing), esbuild config, and a runnable demo script. --- examples/materializer-demo.ts | 219 +++++++ grunt/esbuild/build.js | 17 + src/plugins/materializer/README.md | 133 ++++ src/plugins/materializer/index.ts | 3 + .../materializer/messagematerializer.ts | 362 ++++++++++ src/plugins/materializer/partial-json.ts | 415 ++++++++++++ test/common/globals/named_dependencies.js | 4 + test/realtime/materializer.test.js | 617 ++++++++++++++++++ 8 files changed, 1770 insertions(+) create mode 100644 examples/materializer-demo.ts create mode 100644 src/plugins/materializer/README.md create mode 100644 src/plugins/materializer/index.ts create mode 100644 src/plugins/materializer/messagematerializer.ts create mode 100644 src/plugins/materializer/partial-json.ts create mode 100644 test/realtime/materializer.test.js diff --git a/examples/materializer-demo.ts b/examples/materializer-demo.ts new file mode 100644 index 0000000000..6bc198b18f --- /dev/null +++ b/examples/materializer-demo.ts @@ -0,0 +1,219 @@ +/** + * MessageMaterializer POC Demo + * + * Demonstrates: + * 1. Streaming JSON via message.append (simulating AI/LLM token streaming) + * 2. MessageMaterializer accumulating appends automatically + * 3. toPartialJSON() rendering incomplete JSON at each step + * 4. Late-joiner catching up via getMessage() + * + * Prerequisites: + * - An Ably app with a channel namespace (e.g., "ai") that has + * "Message annotations, updates, and deletes" (mutableMessages) enabled. + * See README.md for setup instructions. + * + * Usage: + * export ABLY_API_KEY="your-api-key" + * npm run build:node + * npx tsx examples/materializer-demo.ts + */ + +import * as Ably from 'ably'; +import { MessageMaterializer, MaterializedMessage } from '../src/plugins/materializer'; + +const API_KEY = process.env.ABLY_API_KEY; +if (!API_KEY) { + console.error('Error: Set ABLY_API_KEY environment variable'); + process.exit(1); +} + +// Channel namespace must have mutableMessages enabled in the Ably dashboard +const CHANNEL_NAMESPACE = process.env.ABLY_CHANNEL_NAMESPACE || 'ai'; +const CHANNEL_NAME = `${CHANNEL_NAMESPACE}:materializer-demo-${Date.now()}`; + +// Simulated AI response tokens — these build up a JSON object piece by piece +const JSON_START = + '{"model": "claude-opus-4-20250514", "usage": {"input_tokens": 150, "output_tokens": 0}, "choices": [{"finish_reason": null, "message": {"role": "assistant", "content": "'; + +const TOKENS = [ + 'The ', + 'quick ', + 'brown ', + 'fox ', + 'jumps ', + 'over ', + 'the ', + 'lazy ', + 'dog. ', + 'This ', + 'is ', + 'a ', + 'demonstration ', + 'of ', + 'streaming ', + 'AI ', + 'responses ', + 'over ', + 'Ably ', + 'using ', + 'message.append.', +]; + +const JSON_END = '"}}]}'; + +function showMessage(label: string, msg: MaterializedMessage): void { + const parsed = msg.toPartialJSON>(); + console.log(` [${label}] Partial JSON:`); + if (parsed) { + console.log( + JSON.stringify(parsed, null, 2) + .split('\n') + .map((line) => ' ' + line) + .join('\n'), + ); + } else { + console.log(' (not parseable as JSON)'); + } + console.log(); +} + +async function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +async function main(): Promise { + console.log('=== MessageMaterializer POC Demo ===\n'); + console.log(`Channel: ${CHANNEL_NAME}\n`); + + // Create publisher client + const publisher = new Ably.Realtime({ key: API_KEY, clientId: 'publisher' }); + await publisher.connection.once('connected'); + console.log('Publisher connected.'); + + // Create subscriber client + const subscriber = new Ably.Realtime({ key: API_KEY, clientId: 'subscriber-a' }); + await subscriber.connection.once('connected'); + console.log('Subscriber A connected.\n'); + + const pubChannel = publisher.channels.get(CHANNEL_NAME); + const subChannel = subscriber.channels.get(CHANNEL_NAME); + + // Set up Subscriber A with MessageMaterializer + const materializerA = new MessageMaterializer(subChannel); + + // We need the serial from the first published message to send appends. + // Get it from the subscriber's received create message. + let appendCount = 0; + const totalTokens = TOKENS.length + 1; // +1 for final JSON_END append + + const serialReady = new Promise((resolve) => { + const allAppendsDoneResolve = { resolve: () => {} }; + (globalThis as any).__allAppendsDone = new Promise((r) => { + allAppendsDoneResolve.resolve = r; + }); + + materializerA.subscribe((msg) => { + if (msg.action === 'message.create') { + console.log('--- Subscriber A: initial create ---'); + showMessage('A', msg); + resolve(msg.serial); + } else if (msg.action === 'message.append') { + appendCount++; + // Show every 7th append + the last token append + the final JSON_END append + if (appendCount % 7 === 0 || appendCount === TOKENS.length || appendCount === totalTokens) { + console.log(`--- Subscriber A: after ${appendCount}/${totalTokens} appends ---`); + showMessage('A', msg); + } + if (appendCount === totalTokens) { + allAppendsDoneResolve.resolve(); + } + } + }); + }); + + // Publish initial message with start of JSON + console.log('Publishing initial message...\n'); + await pubChannel.publish({ name: 'ai-response', data: JSON_START }); + + // Wait for subscriber to receive it and give us the serial + const messageSerial = await serialReady; + + // Stream tokens via message.append + console.log('Streaming tokens via message.append...\n'); + for (let i = 0; i < TOKENS.length; i++) { + try { + await pubChannel.appendMessage({ serial: messageSerial, data: TOKENS[i] } as Ably.Message); + } catch (err: any) { + if (err.code === 93002) { + console.error( + '\nError: mutableMessages is not enabled for this channel namespace.\n' + + 'Go to your Ably dashboard > App Settings > Channel Rules and add a rule\n' + + `for the "${CHANNEL_NAMESPACE}" namespace with "Message annotations, updates,\n` + + 'and deletes" enabled.\n', + ); + process.exit(1); + } + throw err; + } + await sleep(100); // Simulate streaming delay + } + + // Wait a bit before late-joiner + await sleep(500); + + // === Late-joiner: Subscriber B connects mid-stream === + console.log('--- Late-joiner (Subscriber B) connecting... ---\n'); + + const lateJoiner = new Ably.Realtime({ key: API_KEY, clientId: 'subscriber-b' }); + await lateJoiner.connection.once('connected'); + console.log('Subscriber B connected (with rewind — gets full materialized state).\n'); + + // Late-joiner uses rewind to get history + const lateChannel = lateJoiner.channels.get(CHANNEL_NAME, { + params: { rewind: '100' }, + }); + + const materializerB = new MessageMaterializer(lateChannel); + + const lateJoinerDone = new Promise((resolve) => { + materializerB.subscribe((msg) => { + console.log(`--- Subscriber B (late-joiner): received ${msg.action} ---`); + showMessage('B', msg); + resolve(); + }); + }); + + // Wait for late-joiner to get the rewind message + await Promise.race([lateJoinerDone, sleep(5000)]); + + // Publish the closing JSON to complete the object + console.log('Publishing final append (closing JSON structure)...\n'); + await pubChannel.appendMessage({ serial: messageSerial, data: JSON_END } as Ably.Message); + + // Wait for all appends to be received + await Promise.race([(globalThis as any).__allAppendsDone, sleep(5000)]); + await sleep(500); + + // Show final state from Subscriber A + console.log('=== Final Materialized State (Subscriber A) ===\n'); + const messages = materializerA.getMessages(); + for (const msg of messages) { + const parsed = msg.toPartialJSON>(); + if (parsed) { + console.log(JSON.stringify(parsed, null, 2)); + } + } + + // Cleanup + console.log('\nDone.'); + materializerA.unsubscribe(); + materializerB.unsubscribe(); + publisher.close(); + subscriber.close(); + lateJoiner.close(); +} + +main().catch((err) => { + console.error('Demo failed:', err); + process.exit(1); +}); diff --git a/grunt/esbuild/build.js b/grunt/esbuild/build.js index b6d7b95c04..4bd231cc3e 100644 --- a/grunt/esbuild/build.js +++ b/grunt/esbuild/build.js @@ -109,6 +109,21 @@ const minifiedLiveObjectsPluginCdnConfig = { minify: true, }; +const materializerPluginConfig = { + ...createBaseConfig(), + entryPoints: ['src/plugins/materializer/index.ts'], + plugins: [umdWrapper.default({ libraryName: 'AblyMessageMaterializerPlugin', amdNamedModule: false })], + outfile: 'build/materializer.js', +}; + +const materializerPluginEsmConfig = { + ...createBaseConfig(), + format: 'esm', + plugins: [], + entryPoints: ['src/plugins/materializer/index.ts'], + outfile: 'build/materializer.mjs', +}; + module.exports = { webConfig, minifiedWebConfig, @@ -121,4 +136,6 @@ module.exports = { liveObjectsPluginEsmConfig, liveObjectsPluginCdnConfig, minifiedLiveObjectsPluginCdnConfig, + materializerPluginConfig, + materializerPluginEsmConfig, }; diff --git a/src/plugins/materializer/README.md b/src/plugins/materializer/README.md new file mode 100644 index 0000000000..e25c7bb446 --- /dev/null +++ b/src/plugins/materializer/README.md @@ -0,0 +1,133 @@ +# MessageMaterializer — Proof of Concept + +## What This Is + +A proof of concept demonstrating two convenience features that we believe the SDK should offer natively, motivated by the AI/LLM token streaming use case. + +## Problem 1 — Message Materialization + +Users streaming data via Ably's `message.append` (e.g., AI/LLM token streaming) must manually: +- Track message serials +- Accumulate appended data chunks +- Handle late-join (fetching current state from history) +- Deal with ordering and version watermarks + +The `MessageMaterializer` handles all of this automatically. Subscribing emits the full materialized message (original data + all appends concatenated), similar to how annotations emit the full summary so users don't have to apply incremental updates. + +## Problem 2 — Partial JSON Rendering + +When streaming JSON (e.g., structured AI responses), the accumulated data is often incomplete JSON at any given point during the stream. For example: + +``` +{"model": "claude-opus-4-20250514", "choices": [{"message": {"content": "The quick bro +``` + +`toPartialJSON()` parses this into a valid partial object: + +```json +{ + "model": "claude-opus-4-20250514", + "choices": [ + { + "message": { + "content": "The quick bro" + } + } + ] +} +``` + +This lets users display structured data progressively as it streams in. + +## Why Coupled + +Both features address the same use case (streaming AI responses over Ably) and the materializer is the natural place to offer partial JSON since it's already managing the accumulated state. + +## Status + +**POC only. Not intended for production.** + +- The `partial-json` parser is vendored inline (~280 lines); dependency decisions would be made if this moves to production +- The API surface may change + +## Prerequisites — Channel Namespace Setup + +The `message.append` (and `message.update`/`message.delete`) operations require **mutableMessages** to be enabled on the channel namespace. Without this, append operations will fail with error code `93002`. + +### Setup Steps + +1. Go to the [Ably Dashboard](https://ably.com/accounts) +2. Select your app +3. Go to **Settings** > **Channel Rules** +4. Add a new channel rule (or edit an existing one): + - **Namespace**: `ai` (or whatever namespace you want to use) + - Enable **"Message annotations, updates, and deletes"** (this enables `mutableMessages`) +5. Save the rule + +Your channels must then use the namespace prefix, e.g., `ai:my-channel`. + +## Usage + +```typescript +import * as Ably from 'ably'; +import { MessageMaterializer } from './src/plugins/materializer'; + +// Channel must use a namespace with mutableMessages enabled +const client = new Ably.Realtime({ key: 'your-api-key' }); +const channel = client.channels.get('ai:my-channel'); + +const materializer = new MessageMaterializer(channel); + +materializer.subscribe((msg) => { + // msg.data contains the full accumulated string (original + all appends) + console.log('Accumulated:', msg.data); + + // Parse incomplete JSON for structured rendering + const parsed = msg.toPartialJSON(); + console.log('Partial JSON:', JSON.stringify(parsed, null, 2)); +}); +``` + +## How to Run the Demo + +```bash +# Build the SDK first (demo imports from the local build) +npm run build:node + +# Set your Ably API key +export ABLY_API_KEY="your-api-key" + +# Optionally set the channel namespace (defaults to "ai") +export ABLY_CHANNEL_NAMESPACE="ai" + +# Run with tsx +npx tsx examples/materializer-demo.ts +``` + +The demo creates a publisher and two subscribers (one immediate, one late-joiner) to show: +1. Progressive accumulation of appended data +2. `toPartialJSON()` rendering at each step +3. Late-join catch-up via `getMessage()` +4. The complete final JSON object + +## Architecture + +``` +src/plugins/materializer/ +├── index.ts # Barrel export +├── messagematerializer.ts # Core class +├── partial-json.ts # Vendored partial JSON parser +└── README.md # This file +``` + +The `MessageMaterializer` sits between the Ably channel and the user's listener: + +``` +Channel → MessageMaterializer → User Listener + ↕ + Internal Cache + (Map) + ↕ + getMessage() for + late-join fetch +``` diff --git a/src/plugins/materializer/index.ts b/src/plugins/materializer/index.ts new file mode 100644 index 0000000000..89becd7666 --- /dev/null +++ b/src/plugins/materializer/index.ts @@ -0,0 +1,3 @@ +export { MessageMaterializer } from './messagematerializer'; +export type { MaterializedMessage } from './messagematerializer'; +export { parsePartialJSON, Allow } from './partial-json'; diff --git a/src/plugins/materializer/messagematerializer.ts b/src/plugins/materializer/messagematerializer.ts new file mode 100644 index 0000000000..26b13abc12 --- /dev/null +++ b/src/plugins/materializer/messagematerializer.ts @@ -0,0 +1,362 @@ +import type * as API from '../../../ably'; +import { parsePartialJSON, Allow } from './partial-json'; + +/** + * A materialized message extends the standard InboundMessage with a convenience + * method for parsing incomplete JSON data (useful for AI/LLM token streaming). + */ +export interface MaterializedMessage extends API.InboundMessage { + /** + * Attempt to parse the message's accumulated `data` as JSON. + * If the JSON is incomplete (e.g., mid-stream), uses a partial JSON parser + * to return the best-effort result. + * + * @returns The parsed value, or `undefined` if parsing fails entirely + */ + toPartialJSON(): T | undefined; +} + +interface PendingFetch { + /** Queued appends waiting for the fetch to complete */ + queued: API.InboundMessage[]; + /** Promise for the in-flight getMessage() call */ + promise: Promise; +} + +type MessageListener = (message: MaterializedMessage) => void; +type FilteredListener = { + event: string | null; + userListener: MessageListener; +}; + +/** + * MessageMaterializer provides a convenience layer over Ably's `message.append` + * functionality. It automatically accumulates appended data, handles late-join + * via `channel.getMessage()`, and offers `toPartialJSON()` for rendering + * incomplete JSON objects as they stream in. + * + * Usage: + * ```typescript + * const materializer = new MessageMaterializer(channel); + * materializer.subscribe((msg) => { + * console.log('Accumulated data:', msg.data); + * console.log('Partial JSON:', msg.toPartialJSON()); + * }); + * ``` + */ +export class MessageMaterializer { + private cache: Map = new Map(); + private pendingFetches: Map = new Map(); + private listeners: FilteredListener[] = []; + private channelListener: ((message: API.InboundMessage) => void) | null = null; + private maxMessages: number; + private channel: API.RealtimeChannel; + + constructor(channel: API.RealtimeChannel, options?: { maxMessages?: number }) { + this.channel = channel; + this.maxMessages = options?.maxMessages ?? 100; + } + + /** + * Subscribe to materialized messages. Each emission contains the full + * accumulated state of the message (original + all appends applied). + * + * @param listenerOrEvent - Either a listener callback, or an event name to filter on + * @param listener - The listener callback (when filtering by event name) + */ + async subscribe(listenerOrEvent: string | MessageListener, listener?: MessageListener): Promise { + let event: string | null = null; + let userListener: MessageListener; + + if (typeof listenerOrEvent === 'function') { + userListener = listenerOrEvent; + } else { + event = listenerOrEvent; + if (!listener) throw new Error('MessageMaterializer.subscribe(): listener is required when filtering by event'); + userListener = listener; + } + + this.listeners.push({ event, userListener }); + + // Set up underlying channel subscription if this is the first listener + if (!this.channelListener) { + this.channelListener = (message: API.InboundMessage) => { + this.handleMessage(message); + }; + await this.channel.subscribe(this.channelListener); + } + } + + /** + * Unsubscribe a listener. If all listeners are removed, the underlying + * channel subscription is also cleaned up. + * + * @param listenerOrEvent - Either a listener callback, or an event name + * @param listener - The listener callback (when filtering by event name) + */ + unsubscribe(listenerOrEvent?: string | MessageListener, listener?: MessageListener): void { + if (!listenerOrEvent && !listener) { + // Remove all listeners + this.listeners = []; + } else if (typeof listenerOrEvent === 'function') { + this.listeners = this.listeners.filter((l) => l.userListener !== listenerOrEvent); + } else if (typeof listenerOrEvent === 'string') { + if (listener) { + this.listeners = this.listeners.filter( + (l) => !(l.event === listenerOrEvent && l.userListener === listener), + ); + } else { + this.listeners = this.listeners.filter((l) => l.event !== listenerOrEvent); + } + } + + // Clean up channel subscription if no listeners remain + if (this.listeners.length === 0 && this.channelListener) { + this.channel.unsubscribe(this.channelListener); + this.channelListener = null; + } + } + + /** + * Get a snapshot of all currently cached materialized messages. + */ + getMessages(): MaterializedMessage[] { + return Array.from(this.cache.values()); + } + + /** + * Get a specific cached message by serial. + */ + getMessage(serial: string): MaterializedMessage | undefined { + return this.cache.get(serial); + } + + private handleMessage(message: API.InboundMessage): void { + switch (message.action) { + case 'message.create': + this.handleCreate(message); + break; + case 'message.update': + this.handleUpdate(message); + break; + case 'message.append': + this.handleAppend(message); + break; + case 'message.delete': + this.handleDelete(message); + break; + case 'message.summary': + this.handleSummary(message); + break; + default: + // Pass through unknown actions (e.g., 'meta') + break; + } + } + + private handleCreate(message: API.InboundMessage): void { + const materialized = this.wrapMessage(message); + this.storeAndEvict(message.serial, materialized); + this.emit(materialized); + } + + private handleUpdate(message: API.InboundMessage): void { + const existing = this.cache.get(message.serial); + if (existing) { + // Only apply if version is newer + if (this.isNewerVersion(message.version, existing.version)) { + const materialized = this.wrapMessage(message); + this.storeAndEvict(message.serial, materialized); + this.emit(materialized); + } + } else { + // No cached version — store as-is (server has already materialized it for rewind) + const materialized = this.wrapMessage(message); + this.storeAndEvict(message.serial, materialized); + this.emit(materialized); + } + } + + private handleAppend(message: API.InboundMessage): void { + const serial = message.serial; + const existing = this.cache.get(serial); + + if (existing) { + // Append data to cached message + this.applyAppend(existing, message); + this.emit(existing); + } else if (this.pendingFetches.has(serial)) { + // Already fetching — queue this append + this.pendingFetches.get(serial)!.queued.push(message); + } else { + // Unknown serial — fetch from server and queue + this.fetchAndMaterialize(serial, message); + } + } + + private handleDelete(message: API.InboundMessage): void { + const existing = this.cache.get(message.serial); + if (existing) { + if (this.isNewerVersion(message.version, existing.version)) { + // Update with delete action + existing.action = message.action; + existing.version = message.version; + existing.timestamp = message.timestamp; + this.emit(existing); + } + } else { + const materialized = this.wrapMessage(message); + this.storeAndEvict(message.serial, materialized); + this.emit(materialized); + } + } + + private handleSummary(message: API.InboundMessage): void { + const existing = this.cache.get(message.serial); + if (existing) { + existing.annotations = message.annotations; + existing.action = message.action; + this.emit(existing); + } else { + // Store the summary even without a cached message + const materialized = this.wrapMessage(message); + this.storeAndEvict(message.serial, materialized); + this.emit(materialized); + } + } + + private fetchAndMaterialize(serial: string, firstAppend: API.InboundMessage): void { + const pending: PendingFetch = { + queued: [firstAppend], + promise: Promise.resolve(), + }; + + pending.promise = (async () => { + try { + const fetched = await this.channel.getMessage(serial); + const materialized = this.wrapMessage(fetched as API.InboundMessage); + + // Apply queued appends that are newer than the fetched version + const watermark = fetched.version?.serial; + for (const queuedAppend of pending.queued) { + if (watermark && queuedAppend.version?.serial && queuedAppend.version.serial <= watermark) { + // This append is already included in the fetched state — skip + continue; + } + this.applyAppend(materialized, queuedAppend); + } + + this.storeAndEvict(serial, materialized); + this.emit(materialized); + } catch (err) { + // If fetch fails, create a best-effort message from the queued appends + console.warn(`MessageMaterializer: Failed to fetch message ${serial}:`, err); + const bestEffort = this.wrapMessage(firstAppend); + // Apply remaining queued appends + for (let i = 1; i < pending.queued.length; i++) { + this.applyAppend(bestEffort, pending.queued[i]); + } + this.storeAndEvict(serial, bestEffort); + this.emit(bestEffort); + } finally { + this.pendingFetches.delete(serial); + } + })(); + + this.pendingFetches.set(serial, pending); + } + + private applyAppend(target: MaterializedMessage, append: API.InboundMessage): void { + // Concatenate string data + if (typeof target.data === 'string' && typeof append.data === 'string') { + target.data = target.data + append.data; + } else if (target.data instanceof ArrayBuffer && append.data instanceof ArrayBuffer) { + // Concatenate binary data + const combined = new Uint8Array(target.data.byteLength + append.data.byteLength); + combined.set(new Uint8Array(target.data), 0); + combined.set(new Uint8Array(append.data), target.data.byteLength); + target.data = combined.buffer; + } else { + // Fallback: coerce to string + target.data = String(target.data ?? '') + String(append.data ?? ''); + } + + // Update version and action metadata + target.version = append.version; + target.action = append.action; + } + + private wrapMessage(message: API.InboundMessage): MaterializedMessage { + // Create a shallow copy with toPartialJSON added + const materialized: MaterializedMessage = { + ...message, + toPartialJSON(): T | undefined { + const data = this.data; + if (data == null) return undefined; + + const str = typeof data === 'string' ? data : String(data); + if (!str.trim()) return undefined; + + // Try complete JSON first + try { + return JSON.parse(str) as T; + } catch { + // Fall through to partial parsing + } + + // Try partial JSON parsing + try { + return parsePartialJSON(str, Allow.ALL) as T; + } catch { + return undefined; + } + }, + }; + + return materialized; + } + + private storeAndEvict(serial: string, message: MaterializedMessage): void { + this.cache.set(serial, message); + + // Evict oldest entries if over capacity + if (this.cache.size > this.maxMessages) { + const keysToDelete: string[] = []; + let count = 0; + const excess = this.cache.size - this.maxMessages; + this.cache.forEach((_value, key) => { + if (count >= excess) return; + keysToDelete.push(key); + count++; + }); + keysToDelete.forEach((key) => this.cache.delete(key)); + } + } + + private emit(message: MaterializedMessage): void { + for (const { event, userListener } of this.listeners) { + if (event === null || event === message.name) { + try { + userListener(message); + } catch (err) { + console.error('MessageMaterializer: Listener threw an exception:', err); + } + } + } + } + + private isNewerVersion( + incoming: API.MessageVersion, + existing: API.MessageVersion, + ): boolean { + // Compare by version serial (lexicographic), then by timestamp + if (incoming.serial && existing.serial) { + return incoming.serial > existing.serial; + } + if (incoming.timestamp && existing.timestamp) { + return incoming.timestamp > existing.timestamp; + } + // If we can't compare, assume incoming is newer + return true; + } +} diff --git a/src/plugins/materializer/partial-json.ts b/src/plugins/materializer/partial-json.ts new file mode 100644 index 0000000000..1e34f478b7 --- /dev/null +++ b/src/plugins/materializer/partial-json.ts @@ -0,0 +1,415 @@ +/** + * partial-json - Parse incomplete JSON strings + * + * Vendored from https://github.com/promplate/partial-json-parser-js + * MIT License - Copyright (c) 2024 promplate + * + * This is a recursive descent parser that takes incomplete JSON and returns + * the best-effort parsed result. Useful for rendering streaming JSON as it + * arrives (e.g., AI/LLM token streaming). + */ + +// Allow flags to control what incomplete structures are acceptable +export const Allow = { + STR: 0x1, + NUM: 0x2, + ARR: 0x4, + OBJ: 0x8, + NULL: 0x10, + BOOL: 0x20, + NAN: 0x40, + INFINITY: 0x80, + _INFINITY: 0x100, + ALL: + 0x1 | 0x2 | 0x4 | 0x8 | 0x10 | 0x20 | 0x40 | 0x80 | 0x100, +} as const; + +type AllowFlags = number; + +class PartialJSON { + private pos: number = 0; + private str: string = ''; + + parse(str: string, allowPartial: AllowFlags = Allow.ALL): unknown { + if (typeof str !== 'string') { + throw new TypeError(`expecting str, got ${typeof str}`); + } + if (!str.trim()) { + throw new Error(`Unexpected end of input`); + } + this.str = str; + this.pos = 0; + return this.parseValue(allowPartial); + } + + private parseValue(allowPartial: AllowFlags): unknown { + this.skipWhitespace(); + if (this.pos >= this.str.length) { + throw new Error('Unexpected end of input'); + } + + const ch = this.str[this.pos]; + + if (ch === '"') return this.parseString(allowPartial); + if (ch === '{') return this.parseObject(allowPartial); + if (ch === '[') return this.parseArray(allowPartial); + if (ch === '-' || (ch >= '0' && ch <= '9')) return this.parseNumber(allowPartial); + if (ch === 't' || ch === 'f') return this.parseBoolean(allowPartial); + if (ch === 'n') return this.parseNull(allowPartial); + if (ch === 'N') return this.parseNaN(allowPartial); + if (ch === 'I') return this.parseInfinity(allowPartial); + + throw new Error(`Unexpected character '${ch}' at position ${this.pos}`); + } + + private skipWhitespace(): void { + while (this.pos < this.str.length && ' \t\n\r'.includes(this.str[this.pos])) { + this.pos++; + } + } + + private parseString(allowPartial: AllowFlags): string { + // Skip opening quote + this.pos++; + let result = ''; + + while (this.pos < this.str.length) { + const ch = this.str[this.pos]; + + if (ch === '\\') { + if (this.pos + 1 >= this.str.length) { + // Incomplete escape at end + if (allowPartial & Allow.STR) return result; + throw new Error('Unexpected end of input in string escape'); + } + const next = this.str[this.pos + 1]; + this.pos += 2; + switch (next) { + case '"': result += '"'; break; + case '\\': result += '\\'; break; + case '/': result += '/'; break; + case 'b': result += '\b'; break; + case 'f': result += '\f'; break; + case 'n': result += '\n'; break; + case 'r': result += '\r'; break; + case 't': result += '\t'; break; + case 'u': { + // Unicode escape: \uXXXX + if (this.pos + 4 > this.str.length) { + // Incomplete unicode escape + if (allowPartial & Allow.STR) return result; + throw new Error('Unexpected end of input in unicode escape'); + } + const hex = this.str.substring(this.pos, this.pos + 4); + const code = parseInt(hex, 16); + if (isNaN(code)) { + throw new Error(`Invalid unicode escape: \\u${hex}`); + } + result += String.fromCharCode(code); + this.pos += 4; + break; + } + default: + result += next; + } + continue; + } + + if (ch === '"') { + this.pos++; // Skip closing quote + return result; + } + + result += ch; + this.pos++; + } + + // Reached end without closing quote + if (allowPartial & Allow.STR) return result; + throw new Error('Unexpected end of input in string'); + } + + private parseNumber(allowPartial: AllowFlags): number { + const start = this.pos; + + if (this.str[this.pos] === '-') { + this.pos++; + if (this.pos >= this.str.length) { + if (allowPartial & Allow.NUM) return 0; + throw new Error('Unexpected end of input in number'); + } + // Check for -Infinity + if (this.str[this.pos] === 'I') { + return this.parseInfinity(allowPartial, true); + } + } + + // Integer part + while (this.pos < this.str.length && this.str[this.pos] >= '0' && this.str[this.pos] <= '9') { + this.pos++; + } + + // Decimal part + if (this.pos < this.str.length && this.str[this.pos] === '.') { + this.pos++; + while (this.pos < this.str.length && this.str[this.pos] >= '0' && this.str[this.pos] <= '9') { + this.pos++; + } + } + + // Exponent part + if (this.pos < this.str.length && (this.str[this.pos] === 'e' || this.str[this.pos] === 'E')) { + this.pos++; + if (this.pos < this.str.length && (this.str[this.pos] === '+' || this.str[this.pos] === '-')) { + this.pos++; + } + while (this.pos < this.str.length && this.str[this.pos] >= '0' && this.str[this.pos] <= '9') { + this.pos++; + } + } + + const numStr = this.str.substring(start, this.pos); + const num = Number(numStr); + + if (isNaN(num)) { + if (allowPartial & Allow.NUM) return 0; + throw new Error(`Invalid number: ${numStr}`); + } + + return num; + } + + private parseBoolean(allowPartial: AllowFlags): boolean { + if (this.str.startsWith('true', this.pos)) { + this.pos += 4; + return true; + } + if (this.str.startsWith('false', this.pos)) { + this.pos += 5; + return false; + } + + // Partial match + if (allowPartial & Allow.BOOL) { + if ('true'.startsWith(this.str.substring(this.pos))) { + this.pos = this.str.length; + return true; + } + if ('false'.startsWith(this.str.substring(this.pos))) { + this.pos = this.str.length; + return false; + } + } + + throw new Error(`Unexpected token at position ${this.pos}`); + } + + private parseNull(allowPartial: AllowFlags): null { + if (this.str.startsWith('null', this.pos)) { + this.pos += 4; + return null; + } + + if (allowPartial & Allow.NULL) { + if ('null'.startsWith(this.str.substring(this.pos))) { + this.pos = this.str.length; + return null; + } + } + + throw new Error(`Unexpected token at position ${this.pos}`); + } + + private parseNaN(allowPartial: AllowFlags): number { + if (this.str.startsWith('NaN', this.pos)) { + this.pos += 3; + return NaN; + } + + if (allowPartial & Allow.NAN) { + if ('NaN'.startsWith(this.str.substring(this.pos))) { + this.pos = this.str.length; + return NaN; + } + } + + throw new Error(`Unexpected token at position ${this.pos}`); + } + + private parseInfinity(allowPartial: AllowFlags, negative: boolean = false): number { + if (this.str.startsWith('Infinity', this.pos)) { + this.pos += 8; + return negative ? -Infinity : Infinity; + } + + const flag = negative ? Allow._INFINITY : Allow.INFINITY; + if (allowPartial & flag) { + if ('Infinity'.startsWith(this.str.substring(this.pos))) { + this.pos = this.str.length; + return negative ? -Infinity : Infinity; + } + } + + throw new Error(`Unexpected token at position ${this.pos}`); + } + + private parseArray(allowPartial: AllowFlags): unknown[] { + // Skip opening bracket + this.pos++; + const arr: unknown[] = []; + + this.skipWhitespace(); + + if (this.pos >= this.str.length) { + if (allowPartial & Allow.ARR) return arr; + throw new Error('Unexpected end of input in array'); + } + + if (this.str[this.pos] === ']') { + this.pos++; + return arr; + } + + while (this.pos < this.str.length) { + this.skipWhitespace(); + if (this.pos >= this.str.length) { + if (allowPartial & Allow.ARR) return arr; + throw new Error('Unexpected end of input in array'); + } + + try { + const value = this.parseValue(allowPartial); + arr.push(value); + } catch { + if (allowPartial & Allow.ARR) return arr; + throw new Error('Unexpected end of input in array'); + } + + this.skipWhitespace(); + if (this.pos >= this.str.length) { + if (allowPartial & Allow.ARR) return arr; + throw new Error('Unexpected end of input in array'); + } + + if (this.str[this.pos] === ',') { + this.pos++; + continue; + } + + if (this.str[this.pos] === ']') { + this.pos++; + return arr; + } + + if (allowPartial & Allow.ARR) return arr; + throw new Error(`Expected ',' or ']' at position ${this.pos}`); + } + + if (allowPartial & Allow.ARR) return arr; + throw new Error('Unexpected end of input in array'); + } + + private parseObject(allowPartial: AllowFlags): Record { + // Skip opening brace + this.pos++; + const obj: Record = {}; + + this.skipWhitespace(); + + if (this.pos >= this.str.length) { + if (allowPartial & Allow.OBJ) return obj; + throw new Error('Unexpected end of input in object'); + } + + if (this.str[this.pos] === '}') { + this.pos++; + return obj; + } + + while (this.pos < this.str.length) { + this.skipWhitespace(); + if (this.pos >= this.str.length) { + if (allowPartial & Allow.OBJ) return obj; + throw new Error('Unexpected end of input in object'); + } + + if (this.str[this.pos] !== '"') { + if (allowPartial & Allow.OBJ) return obj; + throw new Error(`Expected '"' at position ${this.pos}`); + } + + let key: string; + try { + key = this.parseString(allowPartial); + } catch { + if (allowPartial & Allow.OBJ) return obj; + throw new Error('Unexpected end of input in object key'); + } + + this.skipWhitespace(); + if (this.pos >= this.str.length) { + // Key parsed but no colon yet — partial object + if (allowPartial & Allow.OBJ) return obj; + throw new Error('Unexpected end of input in object'); + } + + if (this.str[this.pos] !== ':') { + if (allowPartial & Allow.OBJ) return obj; + throw new Error(`Expected ':' at position ${this.pos}`); + } + this.pos++; // Skip colon + + this.skipWhitespace(); + if (this.pos >= this.str.length) { + // Key and colon parsed but no value — partial object + if (allowPartial & Allow.OBJ) return obj; + throw new Error('Unexpected end of input in object value'); + } + + let value: unknown; + try { + value = this.parseValue(allowPartial); + } catch { + if (allowPartial & Allow.OBJ) return obj; + throw new Error('Unexpected end of input in object value'); + } + + obj[key] = value; + + this.skipWhitespace(); + if (this.pos >= this.str.length) { + if (allowPartial & Allow.OBJ) return obj; + throw new Error('Unexpected end of input in object'); + } + + if (this.str[this.pos] === ',') { + this.pos++; + continue; + } + + if (this.str[this.pos] === '}') { + this.pos++; + return obj; + } + + if (allowPartial & Allow.OBJ) return obj; + throw new Error(`Expected ',' or '}' at position ${this.pos}`); + } + + if (allowPartial & Allow.OBJ) return obj; + throw new Error('Unexpected end of input in object'); + } +} + +const parser = new PartialJSON(); + +/** + * Parse a possibly-incomplete JSON string, returning the best-effort result. + * + * @param str - The (possibly incomplete) JSON string to parse + * @param allowPartial - Bitflags controlling which partial structures are accepted (default: Allow.ALL) + * @returns The parsed value + */ +export function parsePartialJSON(str: string, allowPartial: AllowFlags = Allow.ALL): unknown { + return parser.parse(str, allowPartial); +} diff --git a/test/common/globals/named_dependencies.js b/test/common/globals/named_dependencies.js index db0d819e60..969fae5b2a 100644 --- a/test/common/globals/named_dependencies.js +++ b/test/common/globals/named_dependencies.js @@ -15,6 +15,10 @@ define(function () { browser: 'build/liveobjects', node: 'build/liveobjects', }, + materializer: { + browser: 'build/materializer', + node: 'build/materializer', + }, // test modules globals: { browser: 'test/common/globals/environment', node: 'test/common/globals/environment' }, diff --git a/test/realtime/materializer.test.js b/test/realtime/materializer.test.js new file mode 100644 index 0000000000..7c2b489c03 --- /dev/null +++ b/test/realtime/materializer.test.js @@ -0,0 +1,617 @@ +'use strict'; + +define(['ably', 'shared_helper', 'chai', 'materializer'], function (Ably, Helper, chai, Materializer) { + const expect = chai.expect; + const MessageMaterializer = Materializer.MessageMaterializer; + const parsePartialJSON = Materializer.parsePartialJSON; + + describe('realtime/materializer', function () { + this.timeout(60 * 1000); + + before(function (done) { + const helper = Helper.forHook(this); + helper.setupApp(function (err) { + if (err) { + done(err); + return; + } + done(); + }); + }); + + describe('partial-json parser', function () { + it('parses complete JSON', function () { + expect(parsePartialJSON('{"a": 1, "b": "hello"}')).to.deep.equal({ a: 1, b: 'hello' }); + }); + + it('parses incomplete object (missing closing brace)', function () { + const result = parsePartialJSON('{"a": 1, "b": "hello"'); + expect(result).to.deep.equal({ a: 1, b: 'hello' }); + }); + + it('parses incomplete string value', function () { + const result = parsePartialJSON('{"content": "The quick bro'); + expect(result).to.deep.equal({ content: 'The quick bro' }); + }); + + it('parses incomplete nested object', function () { + const result = parsePartialJSON('{"model": "gpt-4", "choices": [{"message": {"content": "hel'); + expect(result).to.deep.equal({ + model: 'gpt-4', + choices: [{ message: { content: 'hel' } }], + }); + }); + + it('parses incomplete array', function () { + const result = parsePartialJSON('[1, 2, 3'); + expect(result).to.deep.equal([1, 2, 3]); + }); + + it('parses empty incomplete object', function () { + const result = parsePartialJSON('{'); + expect(result).to.deep.equal({}); + }); + + it('parses empty incomplete array', function () { + const result = parsePartialJSON('['); + expect(result).to.deep.equal([]); + }); + + it('parses incomplete key (no value yet)', function () { + const result = parsePartialJSON('{"a": 1, "b"'); + expect(result).to.deep.equal({ a: 1 }); + }); + + it('parses incomplete key with colon (no value yet)', function () { + const result = parsePartialJSON('{"a": 1, "b":'); + expect(result).to.deep.equal({ a: 1 }); + }); + + it('throws for empty input', function () { + expect(function () { parsePartialJSON(''); }).to.throw(); + expect(function () { parsePartialJSON(' '); }).to.throw(); + }); + + it('parses booleans and null', function () { + expect(parsePartialJSON('true')).to.equal(true); + expect(parsePartialJSON('false')).to.equal(false); + expect(parsePartialJSON('null')).to.equal(null); + }); + + it('parses numbers', function () { + expect(parsePartialJSON('42')).to.equal(42); + expect(parsePartialJSON('-3.14')).to.equal(-3.14); + expect(parsePartialJSON('1e10')).to.equal(1e10); + }); + + it('handles string escapes', function () { + const result = parsePartialJSON('{"msg": "hello\\nworld"}'); + expect(result).to.deep.equal({ msg: 'hello\nworld' }); + }); + + it('handles unicode escapes', function () { + const result = parsePartialJSON('{"emoji": "\\u0041"}'); + expect(result).to.deep.equal({ emoji: 'A' }); + }); + }); + + describe('MessageMaterializer', function () { + /** + * Test that materializer receives create messages and emits them with toPartialJSON + */ + it('emits create messages with toPartialJSON', async function () { + const helper = this.test.helper; + const realtime = helper.AblyRealtime(); + + try { + const channel = realtime.channels.get('mutable:materializer_create_' + Date.now()); + await channel.attach(); + + const materializer = new MessageMaterializer(channel); + + const received = new Promise(function (resolve) { + materializer.subscribe(function (msg) { + resolve(msg); + }); + }); + + await channel.publish('test', '{"key": "value"}'); + + const msg = await received; + expect(msg.action).to.equal('message.create'); + expect(msg.data).to.equal('{"key": "value"}'); + expect(msg.toPartialJSON).to.be.a('function'); + + const parsed = msg.toPartialJSON(); + expect(parsed).to.deep.equal({ key: 'value' }); + + materializer.unsubscribe(); + } finally { + realtime.close(); + } + }); + + /** + * Test that materializer accumulates appended data + */ + it('accumulates message.append data', async function () { + const helper = this.test.helper; + const realtime = helper.AblyRealtime(); + + try { + const channel = realtime.channels.get('mutable:materializer_append_' + Date.now()); + await channel.attach(); + + const materializer = new MessageMaterializer(channel); + + var appendResolve; + var appendDone = new Promise(function (resolve) { + appendResolve = resolve; + }); + + materializer.subscribe(function (msg) { + if (msg.action === 'message.append') { + appendResolve(msg); + } + }); + + // Publish original message + var result = await channel.publish('stream', 'Hello'); + var serial = result.serials[0]; + + // Append data + await channel.appendMessage({ serial: serial, data: ' World' }); + + var appendedMsg = await appendDone; + + // The materializer should have accumulated the data + expect(appendedMsg.data).to.equal('Hello World'); + expect(appendedMsg.action).to.equal('message.append'); + expect(appendedMsg.serial).to.equal(serial); + + materializer.unsubscribe(); + } finally { + realtime.close(); + } + }); + + /** + * Test that toPartialJSON works with incomplete JSON being streamed + */ + it('toPartialJSON renders incomplete JSON during streaming', async function () { + const helper = this.test.helper; + const realtime = helper.AblyRealtime(); + + try { + const channel = realtime.channels.get('mutable:materializer_partial_json_' + Date.now()); + await channel.attach(); + + const materializer = new MessageMaterializer(channel); + + var latestMsg; + var appendCount = 0; + var allDone = new Promise(function (resolve) { + materializer.subscribe(function (msg) { + latestMsg = msg; + if (msg.action === 'message.append') { + appendCount++; + if (appendCount === 2) resolve(); + } + }); + }); + + // Publish start of JSON object + var result = await channel.publish('ai-response', '{"model": "test", "content": "'); + var serial = result.serials[0]; + + // Append tokens + await channel.appendMessage({ serial: serial, data: 'Hello ' }); + await channel.appendMessage({ serial: serial, data: 'World' }); + + await allDone; + + // The partial JSON should parse the incomplete JSON + var parsed = latestMsg.toPartialJSON(); + expect(parsed).to.have.property('model', 'test'); + expect(parsed).to.have.property('content', 'Hello World'); + + materializer.unsubscribe(); + } finally { + realtime.close(); + } + }); + + /** + * Test that materializer handles multiple appends in sequence + */ + it('handles multiple sequential appends', async function () { + const helper = this.test.helper; + const realtime = helper.AblyRealtime(); + + try { + const channel = realtime.channels.get('mutable:materializer_multi_append_' + Date.now()); + await channel.attach(); + + const materializer = new MessageMaterializer(channel); + + var tokens = ['The ', 'quick ', 'brown ', 'fox']; + var appendCount = 0; + var allAppendsDone = new Promise(function (resolve) { + materializer.subscribe(function (msg) { + if (msg.action === 'message.append') { + appendCount++; + if (appendCount === tokens.length) resolve(msg); + } + }); + }); + + var result = await channel.publish('stream', 'Start: '); + var serial = result.serials[0]; + + for (var i = 0; i < tokens.length; i++) { + await channel.appendMessage({ serial: serial, data: tokens[i] }); + } + + var finalMsg = await allAppendsDone; + expect(finalMsg.data).to.equal('Start: The quick brown fox'); + + materializer.unsubscribe(); + } finally { + realtime.close(); + } + }); + + /** + * Test that materializer caches messages and getMessages() returns them + */ + it('caches messages accessible via getMessages()', async function () { + const helper = this.test.helper; + const realtime = helper.AblyRealtime(); + + try { + const channel = realtime.channels.get('mutable:materializer_cache_' + Date.now()); + await channel.attach(); + + const materializer = new MessageMaterializer(channel); + + var count = 0; + var allReceived = new Promise(function (resolve) { + materializer.subscribe(function () { + count++; + if (count === 2) resolve(); + }); + }); + + await channel.publish('msg1', 'data1'); + await channel.publish('msg2', 'data2'); + + await allReceived; + + var messages = materializer.getMessages(); + expect(messages).to.have.length(2); + expect(messages[0].name).to.equal('msg1'); + expect(messages[1].name).to.equal('msg2'); + + materializer.unsubscribe(); + } finally { + realtime.close(); + } + }); + + /** + * Test getMessage() returns a specific cached message + */ + it('getMessage() returns specific cached message by serial', async function () { + const helper = this.test.helper; + const realtime = helper.AblyRealtime(); + + try { + const channel = realtime.channels.get('mutable:materializer_getmsg_' + Date.now()); + await channel.attach(); + + const materializer = new MessageMaterializer(channel); + + var receivedSerial; + var received = new Promise(function (resolve) { + materializer.subscribe(function (msg) { + receivedSerial = msg.serial; + resolve(); + }); + }); + + await channel.publish('test', 'data'); + await received; + + var cached = materializer.getMessage(receivedSerial); + expect(cached).to.exist; + expect(cached.data).to.equal('data'); + expect(cached.name).to.equal('test'); + + // Non-existent serial returns undefined + expect(materializer.getMessage('nonexistent')).to.be.undefined; + + materializer.unsubscribe(); + } finally { + realtime.close(); + } + }); + + /** + * Test memory cap / eviction + */ + it('evicts oldest messages when maxMessages is exceeded', async function () { + const helper = this.test.helper; + const realtime = helper.AblyRealtime(); + + try { + const channel = realtime.channels.get('mutable:materializer_eviction_' + Date.now()); + await channel.attach(); + + // Set a low maxMessages to test eviction + const materializer = new MessageMaterializer(channel, { maxMessages: 3 }); + + var count = 0; + var allReceived = new Promise(function (resolve) { + materializer.subscribe(function () { + count++; + if (count === 5) resolve(); + }); + }); + + // Publish 5 messages + for (var i = 0; i < 5; i++) { + await channel.publish('msg' + i, 'data' + i); + } + + await allReceived; + + // Should only have 3 messages (the 3 most recent) + var messages = materializer.getMessages(); + expect(messages).to.have.length(3); + expect(messages[0].name).to.equal('msg2'); + expect(messages[1].name).to.equal('msg3'); + expect(messages[2].name).to.equal('msg4'); + + materializer.unsubscribe(); + } finally { + realtime.close(); + } + }); + + /** + * Test event-filtered subscription + */ + it('filters by event name when subscribing', async function () { + const helper = this.test.helper; + const realtime = helper.AblyRealtime(); + + try { + const channel = realtime.channels.get('mutable:materializer_filter_' + Date.now()); + await channel.attach(); + + const materializer = new MessageMaterializer(channel); + + var received = []; + var done = new Promise(function (resolve) { + // Subscribe only to 'target' events + materializer.subscribe('target', function (msg) { + received.push(msg.name); + if (received.length === 2) resolve(); + }); + }); + + await channel.publish('other', 'skip this'); + await channel.publish('target', 'want this 1'); + await channel.publish('other', 'skip this too'); + await channel.publish('target', 'want this 2'); + + await done; + + expect(received).to.deep.equal(['target', 'target']); + + materializer.unsubscribe(); + } finally { + realtime.close(); + } + }); + + /** + * Test unsubscribe removes specific listener + */ + it('unsubscribe removes specific listener', async function () { + const helper = this.test.helper; + const realtime = helper.AblyRealtime(); + + try { + const channel = realtime.channels.get('mutable:materializer_unsub_' + Date.now()); + await channel.attach(); + + const materializer = new MessageMaterializer(channel); + + var received1 = []; + var received2 = []; + + var listener1 = function (msg) { received1.push(msg.name); }; + var listener2 = function (msg) { received2.push(msg.name); }; + + await materializer.subscribe(listener1); + await materializer.subscribe(listener2); + + // Publish first message and wait for listener2 to see it + var firstDone = new Promise(function (resolve) { + var origListener2 = listener2; + // We know listener2 fires after listener1, so when received2 has 'first' we're done + var checkInterval = setInterval(function () { + if (received1.length >= 1 && received2.length >= 1) { + clearInterval(checkInterval); + resolve(); + } + }, 50); + }); + await channel.publish('first', 'data'); + await firstDone; + + // Unsubscribe listener1 + materializer.unsubscribe(listener1); + + // Publish second message and wait for listener2 to see it + var secondDone = new Promise(function (resolve) { + var checkInterval = setInterval(function () { + if (received2.length >= 2) { + clearInterval(checkInterval); + resolve(); + } + }, 50); + }); + await channel.publish('second', 'data'); + await secondDone; + + // listener1 should only have first, listener2 should have both + expect(received1).to.deep.equal(['first']); + expect(received2).to.deep.equal(['first', 'second']); + + materializer.unsubscribe(); + } finally { + realtime.close(); + } + }); + + /** + * Test late-join: subscriber connects after messages published, + * uses rewind to receive materialized state + */ + it('handles late-join via rewind with materialized state', async function () { + const helper = this.test.helper; + + // Publisher client + var publisher = helper.AblyRealtime(); + + try { + var channelName = 'mutable:materializer_latejoin_' + Date.now(); + var pubChannel = publisher.channels.get(channelName); + await pubChannel.attach(); + + // Publish and append before subscriber joins + var result = await pubChannel.publish('streamed', 'Hello'); + var serial = result.serials[0]; + await pubChannel.appendMessage({ serial: serial, data: ' World' }); + + // Small delay to ensure server processes the append + await new Promise(function (r) { setTimeout(r, 500); }); + + // Now create subscriber that uses rewind to get history + var subscriber = helper.AblyRealtime(); + var subChannel = subscriber.channels.get(channelName, { + params: { rewind: '100' }, + }); + + var materializer = new MessageMaterializer(subChannel); + + var received = new Promise(function (resolve) { + materializer.subscribe(function (msg) { + resolve(msg); + }); + }); + + var msg = await received; + + // Via rewind, the server sends the materialized message as message.update + // with the full accumulated data + expect(msg.data).to.equal('Hello World'); + expect(msg.serial).to.equal(serial); + + materializer.unsubscribe(); + subscriber.close(); + } finally { + publisher.close(); + } + }); + + /** + * Test that toPartialJSON returns undefined for non-JSON data + */ + it('toPartialJSON returns undefined for non-parseable data', async function () { + const helper = this.test.helper; + const realtime = helper.AblyRealtime(); + + try { + const channel = realtime.channels.get('mutable:materializer_nojson_' + Date.now()); + await channel.attach(); + + const materializer = new MessageMaterializer(channel); + + var received = new Promise(function (resolve) { + materializer.subscribe(function (msg) { resolve(msg); }); + }); + + await channel.publish('test', 'not json at all >>>'); + + var msg = await received; + expect(msg.toPartialJSON()).to.be.undefined; + + materializer.unsubscribe(); + } finally { + realtime.close(); + } + }); + + /** + * Test toPartialJSON with complete JSON returns fully parsed result + */ + it('toPartialJSON parses complete JSON correctly', async function () { + const helper = this.test.helper; + const realtime = helper.AblyRealtime(); + + try { + const channel = realtime.channels.get('mutable:materializer_completejson_' + Date.now()); + await channel.attach(); + + const materializer = new MessageMaterializer(channel); + + var received = new Promise(function (resolve) { + materializer.subscribe(function (msg) { resolve(msg); }); + }); + + var data = JSON.stringify({ model: 'test', tokens: [1, 2, 3], done: true }); + await channel.publish('test', data); + + var msg = await received; + var parsed = msg.toPartialJSON(); + expect(parsed).to.deep.equal({ model: 'test', tokens: [1, 2, 3], done: true }); + + materializer.unsubscribe(); + } finally { + realtime.close(); + } + }); + + /** + * Test toPartialJSON with null data + */ + it('toPartialJSON returns undefined for null data', async function () { + const helper = this.test.helper; + const realtime = helper.AblyRealtime(); + + try { + const channel = realtime.channels.get('mutable:materializer_nulldata_' + Date.now()); + await channel.attach(); + + const materializer = new MessageMaterializer(channel); + + var received = new Promise(function (resolve) { + materializer.subscribe(function (msg) { resolve(msg); }); + }); + + await channel.publish('test', null); + + var msg = await received; + expect(msg.toPartialJSON()).to.be.undefined; + + materializer.unsubscribe(); + } finally { + realtime.close(); + } + }); + }); + }); +}); From 9e48be3429d7a7ad3c567f8ca37064ec0040bace Mon Sep 17 00:00:00 2001 From: Matthew O'Riordan Date: Sat, 14 Mar 2026 06:29:31 +0100 Subject: [PATCH 2/2] refactor: minimize partial-json parser and add upstream test parity MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rewrote the vendored partial-json parser to match the upstream (promplate/partial-json-parser-js) approach: delegate string/number parsing to JSON.parse, use closures instead of a class, and share helpers for keyword matching and error throwing. Added 20 new tests matching upstream test cases (Allow flags, edge cases, error paths). Total: 47 tests, all passing. Size reduction (minified): 6,357 → 1,488 bytes (77% smaller). --- Here are the optimization results across 5 parallel iterations: ┌────────────┬────────────────────────────────────────────────────────────────────┬────────────────┬───────────┐ │ Iteration │ Strategy │ Minified bytes │ Reduction │ ├────────────┼────────────────────────────────────────────────────────────────────┼────────────────┼───────────┤ │ Baseline │ Original class-based parser │ 6,357 │ - │ ├────────────┼────────────────────────────────────────────────────────────────────┼────────────────┼───────────┤ │ 1 │ Upstream approach (closures + JSON.parse delegation) │ 2,080 │ 67% │ ├────────────┼────────────────────────────────────────────────────────────────────┼────────────────┼───────────┤ │ 2 │ Merged container parsing + shared error helper │ 1,601 │ 75% │ ├────────────┼────────────────────────────────────────────────────────────────────┼────────────────┼───────────┤ │ 3 (winner) │ Single recursive parse fn + fail()/J aliases + s[i]<'!' whitespace │ 1,488 │ 77% │ ├────────────┼────────────────────────────────────────────────────────────────────┼────────────────┼───────────┤ │ 4 │ Lookup table for keywords + guard helper │ 1,628 │ 74% │ ├────────────┼────────────────────────────────────────────────────────────────────┼────────────────┼───────────┤ │ 5 │ Micro-optimizations (slice, charCodeAt, cached JSON.parse) │ 2,000 │ 69% │ └────────────┴────────────────────────────────────────────────────────────────────┴────────────────┴───────────┘ Key techniques in the winning version: - JSON.parse for string escape handling and number parsing (instead of manual character scanning) - Closures over local i/s/len instead of a class with this.pos/this.str - fail() and J aliases deduplicate throw Error and JSON.parse references - s[i] < '!' for whitespace (all whitespace chars are below ! in ASCII) - lit() helper handles all 6 keyword types (null/true/false/NaN/Infinity) in ~3 lines Source went from 422 lines → 105 lines. Gzipped from 1,347 → 743 bytes. --- src/plugins/materializer/partial-json.ts | 482 ++++------------------- test/realtime/materializer.test.js | 124 ++++++ 2 files changed, 210 insertions(+), 396 deletions(-) diff --git a/src/plugins/materializer/partial-json.ts b/src/plugins/materializer/partial-json.ts index 1e34f478b7..fc8471f019 100644 --- a/src/plugins/materializer/partial-json.ts +++ b/src/plugins/materializer/partial-json.ts @@ -4,412 +4,102 @@ * Vendored from https://github.com/promplate/partial-json-parser-js * MIT License - Copyright (c) 2024 promplate * - * This is a recursive descent parser that takes incomplete JSON and returns - * the best-effort parsed result. Useful for rendering streaming JSON as it - * arrives (e.g., AI/LLM token streaming). + * Recursive descent parser for incomplete JSON. Useful for rendering + * streaming JSON as it arrives (e.g., AI/LLM token streaming). */ -// Allow flags to control what incomplete structures are acceptable -export const Allow = { - STR: 0x1, - NUM: 0x2, - ARR: 0x4, - OBJ: 0x8, - NULL: 0x10, - BOOL: 0x20, - NAN: 0x40, - INFINITY: 0x80, - _INFINITY: 0x100, - ALL: - 0x1 | 0x2 | 0x4 | 0x8 | 0x10 | 0x20 | 0x40 | 0x80 | 0x100, -} as const; +export const Allow = { STR: 1, NUM: 2, ARR: 4, OBJ: 8, NULL: 16, BOOL: 32, NAN: 64, INFINITY: 128, _INFINITY: 256, ALL: 511 } as const; -type AllowFlags = number; +const fail = (msg?: string): never => { throw Error(msg); }; +const J = JSON.parse; -class PartialJSON { - private pos: number = 0; - private str: string = ''; - - parse(str: string, allowPartial: AllowFlags = Allow.ALL): unknown { - if (typeof str !== 'string') { - throw new TypeError(`expecting str, got ${typeof str}`); - } - if (!str.trim()) { - throw new Error(`Unexpected end of input`); - } - this.str = str; - this.pos = 0; - return this.parseValue(allowPartial); - } - - private parseValue(allowPartial: AllowFlags): unknown { - this.skipWhitespace(); - if (this.pos >= this.str.length) { - throw new Error('Unexpected end of input'); - } - - const ch = this.str[this.pos]; - - if (ch === '"') return this.parseString(allowPartial); - if (ch === '{') return this.parseObject(allowPartial); - if (ch === '[') return this.parseArray(allowPartial); - if (ch === '-' || (ch >= '0' && ch <= '9')) return this.parseNumber(allowPartial); - if (ch === 't' || ch === 'f') return this.parseBoolean(allowPartial); - if (ch === 'n') return this.parseNull(allowPartial); - if (ch === 'N') return this.parseNaN(allowPartial); - if (ch === 'I') return this.parseInfinity(allowPartial); - - throw new Error(`Unexpected character '${ch}' at position ${this.pos}`); - } - - private skipWhitespace(): void { - while (this.pos < this.str.length && ' \t\n\r'.includes(this.str[this.pos])) { - this.pos++; - } - } - - private parseString(allowPartial: AllowFlags): string { - // Skip opening quote - this.pos++; - let result = ''; - - while (this.pos < this.str.length) { - const ch = this.str[this.pos]; - - if (ch === '\\') { - if (this.pos + 1 >= this.str.length) { - // Incomplete escape at end - if (allowPartial & Allow.STR) return result; - throw new Error('Unexpected end of input in string escape'); - } - const next = this.str[this.pos + 1]; - this.pos += 2; - switch (next) { - case '"': result += '"'; break; - case '\\': result += '\\'; break; - case '/': result += '/'; break; - case 'b': result += '\b'; break; - case 'f': result += '\f'; break; - case 'n': result += '\n'; break; - case 'r': result += '\r'; break; - case 't': result += '\t'; break; - case 'u': { - // Unicode escape: \uXXXX - if (this.pos + 4 > this.str.length) { - // Incomplete unicode escape - if (allowPartial & Allow.STR) return result; - throw new Error('Unexpected end of input in unicode escape'); - } - const hex = this.str.substring(this.pos, this.pos + 4); - const code = parseInt(hex, 16); - if (isNaN(code)) { - throw new Error(`Invalid unicode escape: \\u${hex}`); - } - result += String.fromCharCode(code); - this.pos += 4; - break; - } - default: - result += next; +/** + * Parse a possibly-incomplete JSON string, returning the best-effort result. + */ +export function parsePartialJSON(raw: string, allow: number = 511): unknown { + if (typeof raw !== 'string') throw TypeError(); + const s = raw.trim(); + if (!s) fail(); + const len = s.length; + let i = 0; + const sub = (from: number, to?: number) => s.slice(from, to); + + const lit = (word: string, flag: number, val: unknown): unknown => { + if (s.startsWith(word, i)) { i += word.length; return val; } + if (allow & flag && len - i < word.length && word.startsWith(sub(i))) { i = len; return val; } + fail(); + }; + + const ws = () => { while (i < len && s[i] < '!') i++; }; + + const parse = (): unknown => { + ws(); + i >= len && fail(); + const ch = s[i]; + if (ch === '"') { + const start = i++; + let esc = false; + while (i < len && (s[i] !== '"' || esc)) { esc = s[i] === '\\' ? !esc : false; i++; } + if (s[i] === '"') { + try { return J(sub(start, ++i - +esc)); } catch (e) { fail('' + e); } + } + if (allow & 1) { + try { return J(sub(start, i - +esc) + '"'); } catch { + return J(sub(start, s.lastIndexOf('\\')) + '"'); } - continue; - } - - if (ch === '"') { - this.pos++; // Skip closing quote - return result; - } - - result += ch; - this.pos++; - } - - // Reached end without closing quote - if (allowPartial & Allow.STR) return result; - throw new Error('Unexpected end of input in string'); - } - - private parseNumber(allowPartial: AllowFlags): number { - const start = this.pos; - - if (this.str[this.pos] === '-') { - this.pos++; - if (this.pos >= this.str.length) { - if (allowPartial & Allow.NUM) return 0; - throw new Error('Unexpected end of input in number'); - } - // Check for -Infinity - if (this.str[this.pos] === 'I') { - return this.parseInfinity(allowPartial, true); - } - } - - // Integer part - while (this.pos < this.str.length && this.str[this.pos] >= '0' && this.str[this.pos] <= '9') { - this.pos++; - } - - // Decimal part - if (this.pos < this.str.length && this.str[this.pos] === '.') { - this.pos++; - while (this.pos < this.str.length && this.str[this.pos] >= '0' && this.str[this.pos] <= '9') { - this.pos++; - } - } - - // Exponent part - if (this.pos < this.str.length && (this.str[this.pos] === 'e' || this.str[this.pos] === 'E')) { - this.pos++; - if (this.pos < this.str.length && (this.str[this.pos] === '+' || this.str[this.pos] === '-')) { - this.pos++; - } - while (this.pos < this.str.length && this.str[this.pos] >= '0' && this.str[this.pos] <= '9') { - this.pos++; - } - } - - const numStr = this.str.substring(start, this.pos); - const num = Number(numStr); - - if (isNaN(num)) { - if (allowPartial & Allow.NUM) return 0; - throw new Error(`Invalid number: ${numStr}`); - } - - return num; - } - - private parseBoolean(allowPartial: AllowFlags): boolean { - if (this.str.startsWith('true', this.pos)) { - this.pos += 4; - return true; - } - if (this.str.startsWith('false', this.pos)) { - this.pos += 5; - return false; - } - - // Partial match - if (allowPartial & Allow.BOOL) { - if ('true'.startsWith(this.str.substring(this.pos))) { - this.pos = this.str.length; - return true; - } - if ('false'.startsWith(this.str.substring(this.pos))) { - this.pos = this.str.length; - return false; - } - } - - throw new Error(`Unexpected token at position ${this.pos}`); - } - - private parseNull(allowPartial: AllowFlags): null { - if (this.str.startsWith('null', this.pos)) { - this.pos += 4; - return null; - } - - if (allowPartial & Allow.NULL) { - if ('null'.startsWith(this.str.substring(this.pos))) { - this.pos = this.str.length; - return null; - } - } - - throw new Error(`Unexpected token at position ${this.pos}`); - } - - private parseNaN(allowPartial: AllowFlags): number { - if (this.str.startsWith('NaN', this.pos)) { - this.pos += 3; - return NaN; - } - - if (allowPartial & Allow.NAN) { - if ('NaN'.startsWith(this.str.substring(this.pos))) { - this.pos = this.str.length; - return NaN; - } - } - - throw new Error(`Unexpected token at position ${this.pos}`); - } - - private parseInfinity(allowPartial: AllowFlags, negative: boolean = false): number { - if (this.str.startsWith('Infinity', this.pos)) { - this.pos += 8; - return negative ? -Infinity : Infinity; - } - - const flag = negative ? Allow._INFINITY : Allow.INFINITY; - if (allowPartial & flag) { - if ('Infinity'.startsWith(this.str.substring(this.pos))) { - this.pos = this.str.length; - return negative ? -Infinity : Infinity; } + fail(); } - - throw new Error(`Unexpected token at position ${this.pos}`); - } - - private parseArray(allowPartial: AllowFlags): unknown[] { - // Skip opening bracket - this.pos++; - const arr: unknown[] = []; - - this.skipWhitespace(); - - if (this.pos >= this.str.length) { - if (allowPartial & Allow.ARR) return arr; - throw new Error('Unexpected end of input in array'); - } - - if (this.str[this.pos] === ']') { - this.pos++; - return arr; - } - - while (this.pos < this.str.length) { - this.skipWhitespace(); - if (this.pos >= this.str.length) { - if (allowPartial & Allow.ARR) return arr; - throw new Error('Unexpected end of input in array'); - } - + if (ch === '{') { + i++; ws(); + const obj: Record = {}; try { - const value = this.parseValue(allowPartial); - arr.push(value); - } catch { - if (allowPartial & Allow.ARR) return arr; - throw new Error('Unexpected end of input in array'); - } - - this.skipWhitespace(); - if (this.pos >= this.str.length) { - if (allowPartial & Allow.ARR) return arr; - throw new Error('Unexpected end of input in array'); - } - - if (this.str[this.pos] === ',') { - this.pos++; - continue; - } - - if (this.str[this.pos] === ']') { - this.pos++; - return arr; - } - - if (allowPartial & Allow.ARR) return arr; - throw new Error(`Expected ',' or ']' at position ${this.pos}`); - } - - if (allowPartial & Allow.ARR) return arr; - throw new Error('Unexpected end of input in array'); - } - - private parseObject(allowPartial: AllowFlags): Record { - // Skip opening brace - this.pos++; - const obj: Record = {}; - - this.skipWhitespace(); - - if (this.pos >= this.str.length) { - if (allowPartial & Allow.OBJ) return obj; - throw new Error('Unexpected end of input in object'); - } - - if (this.str[this.pos] === '}') { - this.pos++; + while (s[i] !== '}') { + ws(); + if (i >= len && allow & 8) return obj; + const key = parse() as string; + ws(); i++; + try { obj[key] = parse(); } catch { if (allow & 8) return obj; fail(); } + ws(); + if (s[i] === ',') i++; + } + } catch (e) { if (allow & 8) return obj; throw e; } + i++; return obj; } - - while (this.pos < this.str.length) { - this.skipWhitespace(); - if (this.pos >= this.str.length) { - if (allowPartial & Allow.OBJ) return obj; - throw new Error('Unexpected end of input in object'); - } - - if (this.str[this.pos] !== '"') { - if (allowPartial & Allow.OBJ) return obj; - throw new Error(`Expected '"' at position ${this.pos}`); - } - - let key: string; - try { - key = this.parseString(allowPartial); - } catch { - if (allowPartial & Allow.OBJ) return obj; - throw new Error('Unexpected end of input in object key'); - } - - this.skipWhitespace(); - if (this.pos >= this.str.length) { - // Key parsed but no colon yet — partial object - if (allowPartial & Allow.OBJ) return obj; - throw new Error('Unexpected end of input in object'); - } - - if (this.str[this.pos] !== ':') { - if (allowPartial & Allow.OBJ) return obj; - throw new Error(`Expected ':' at position ${this.pos}`); - } - this.pos++; // Skip colon - - this.skipWhitespace(); - if (this.pos >= this.str.length) { - // Key and colon parsed but no value — partial object - if (allowPartial & Allow.OBJ) return obj; - throw new Error('Unexpected end of input in object value'); - } - - let value: unknown; + if (ch === '[') { + i++; + const arr: unknown[] = []; try { - value = this.parseValue(allowPartial); - } catch { - if (allowPartial & Allow.OBJ) return obj; - throw new Error('Unexpected end of input in object value'); - } - - obj[key] = value; - - this.skipWhitespace(); - if (this.pos >= this.str.length) { - if (allowPartial & Allow.OBJ) return obj; - throw new Error('Unexpected end of input in object'); - } - - if (this.str[this.pos] === ',') { - this.pos++; - continue; - } - - if (this.str[this.pos] === '}') { - this.pos++; - return obj; - } - - if (allowPartial & Allow.OBJ) return obj; - throw new Error(`Expected ',' or '}' at position ${this.pos}`); + while (s[i] !== ']') { + arr.push(parse()); + ws(); + if (s[i] === ',') i++; + } + } catch { if (allow & 4) return arr; fail(); } + i++; + return arr; } - - if (allowPartial & Allow.OBJ) return obj; - throw new Error('Unexpected end of input in object'); - } -} - -const parser = new PartialJSON(); - -/** - * Parse a possibly-incomplete JSON string, returning the best-effort result. - * - * @param str - The (possibly incomplete) JSON string to parse - * @param allowPartial - Bitflags controlling which partial structures are accepted (default: Allow.ALL) - * @returns The parsed value - */ -export function parsePartialJSON(str: string, allowPartial: AllowFlags = Allow.ALL): unknown { - return parser.parse(str, allowPartial); + if (ch === 'n') return lit('null', 16, null); + if (ch === 't') return lit('true', 32, true); + if (ch === 'f') return lit('false', 32, false); + if (ch === 'N') return lit('NaN', 64, NaN); + if (ch === 'I') return lit('Infinity', 128, Infinity); + // number + const start = i; + if (ch === '-') { + i++; + if (i < len && s[i] === 'I') try { return -(lit('Infinity', 256, Infinity) as number); } catch { /* not -Infinity */ } + if (i >= len) { if (allow & 2) return 0; fail(); } + } + while (i < len && ',]}'.indexOf(s[i]) < 0) i++; + const num = sub(start, i); + try { return J(num); } catch { + if (i === len && !(allow & 2)) fail(); + if (num === '-') { if (allow & 2) return 0; fail(); } + try { return J(sub(start, s.lastIndexOf('e'))); } catch (e) { fail('' + e); } + } + }; + + return parse(); } diff --git a/test/realtime/materializer.test.js b/test/realtime/materializer.test.js index 7c2b489c03..2a6608e457 100644 --- a/test/realtime/materializer.test.js +++ b/test/realtime/materializer.test.js @@ -4,6 +4,7 @@ define(['ably', 'shared_helper', 'chai', 'materializer'], function (Ably, Helper const expect = chai.expect; const MessageMaterializer = Materializer.MessageMaterializer; const parsePartialJSON = Materializer.parsePartialJSON; + const Allow = Materializer.Allow; describe('realtime/materializer', function () { this.timeout(60 * 1000); @@ -93,6 +94,129 @@ define(['ably', 'shared_helper', 'chai', 'materializer'], function (Ably, Helper const result = parsePartialJSON('{"emoji": "\\u0041"}'); expect(result).to.deep.equal({ emoji: 'A' }); }); + + // Upstream tests from promplate/partial-json-parser-js + describe('Allow flags (upstream parity)', function () { + it('partial string with STR flag', function () { + expect(parsePartialJSON('"', Allow.STR)).to.equal(''); + expect(parsePartialJSON('"hello', Allow.STR)).to.equal('hello'); + }); + + it('partial string throws without STR flag', function () { + expect(function () { parsePartialJSON('"', ~Allow.STR); }).to.throw(); + }); + + it('partial array with ARR flag', function () { + expect(parsePartialJSON('["', Allow.ARR)).to.deep.equal([]); + expect(parsePartialJSON('["', Allow.ARR | Allow.STR)).to.deep.equal(['']); + }); + + it('partial array throws without ARR flag', function () { + expect(function () { parsePartialJSON('[', Allow.STR); }).to.throw(); + expect(function () { parsePartialJSON('["', Allow.STR); }).to.throw(); + expect(function () { parsePartialJSON('[""', Allow.STR); }).to.throw(); + expect(function () { parsePartialJSON('["",', Allow.STR); }).to.throw(); + }); + + it('partial object with OBJ flag', function () { + expect(parsePartialJSON('{"": "', Allow.OBJ)).to.deep.equal({}); + expect(parsePartialJSON('{"": "', Allow.OBJ | Allow.STR)).to.deep.equal({ '': '' }); + }); + + it('partial object throws without OBJ flag', function () { + expect(function () { parsePartialJSON('{', Allow.STR); }).to.throw(); + expect(function () { parsePartialJSON('{"', Allow.STR); }).to.throw(); + expect(function () { parsePartialJSON('{""', Allow.STR); }).to.throw(); + expect(function () { parsePartialJSON('{"":', Allow.STR); }).to.throw(); + expect(function () { parsePartialJSON('{"":"', Allow.STR); }).to.throw(); + expect(function () { parsePartialJSON('{"":""', Allow.STR); }).to.throw(); + }); + + it('partial singletons with flags', function () { + expect(parsePartialJSON('n', Allow.NULL)).to.equal(null); + expect(function () { parsePartialJSON('n', ~Allow.NULL); }).to.throw(); + + expect(parsePartialJSON('t', Allow.BOOL)).to.equal(true); + expect(function () { parsePartialJSON('t', ~Allow.BOOL); }).to.throw(); + + expect(parsePartialJSON('f', Allow.BOOL)).to.equal(false); + expect(function () { parsePartialJSON('f', ~Allow.BOOL); }).to.throw(); + + expect(parsePartialJSON('I', Allow.INFINITY)).to.equal(Infinity); + expect(function () { parsePartialJSON('I', ~Allow.INFINITY); }).to.throw(); + + expect(parsePartialJSON('-I', Allow._INFINITY)).to.equal(-Infinity); + expect(function () { parsePartialJSON('-I', ~Allow._INFINITY); }).to.throw(); + + expect(Number.isNaN(parsePartialJSON('N', Allow.NAN))).to.equal(true); + expect(function () { parsePartialJSON('N', ~Allow.NAN); }).to.throw(); + }); + + it('number parsing with and without NUM flag', function () { + expect(parsePartialJSON('0', ~Allow.NUM)).to.equal(0); + expect(parsePartialJSON('-1.25e+4', ~Allow.NUM)).to.equal(-1.25e4); + expect(parsePartialJSON('-1.25e+', Allow.NUM)).to.equal(-1.25); + expect(parsePartialJSON('-1.25e', Allow.NUM)).to.equal(-1.25); + }); + }); + + describe('edge cases', function () { + it('handles incomplete escape at end of string', function () { + expect(parsePartialJSON('"hello\\', Allow.STR)).to.equal('hello'); + }); + + it('handles incomplete unicode escape', function () { + expect(parsePartialJSON('"hello\\u00', Allow.STR)).to.equal('hello'); + }); + + it('handles all escape sequences', function () { + expect(parsePartialJSON('"\\"\\\\\\/\\b\\f\\n\\r\\t"')).to.equal('"\\/\b\f\n\r\t'); + }); + + it('handles whitespace around values', function () { + expect(parsePartialJSON(' { "a" : 1 , "b" : 2 } ')).to.deep.equal({ a: 1, b: 2 }); + }); + + it('handles deeply nested partial structures', function () { + var result = parsePartialJSON('{"a": {"b": {"c": {"d": "deep'); + expect(result).to.deep.equal({ a: { b: { c: { d: 'deep' } } } }); + }); + + it('handles array with trailing comma', function () { + expect(parsePartialJSON('[1, 2,', Allow.ARR)).to.deep.equal([1, 2]); + }); + + it('handles object with trailing comma', function () { + expect(parsePartialJSON('{"a": 1,', Allow.OBJ)).to.deep.equal({ a: 1 }); + }); + + it('handles negative numbers', function () { + expect(parsePartialJSON('-', Allow.NUM)).to.equal(0); + expect(parsePartialJSON('-42')).to.equal(-42); + }); + + it('handles scientific notation variants', function () { + expect(parsePartialJSON('1E10')).to.equal(1e10); + expect(parsePartialJSON('1e+10')).to.equal(1e10); + expect(parsePartialJSON('1e-10')).to.equal(1e-10); + }); + + it('handles mixed nested arrays and objects', function () { + var result = parsePartialJSON('[{"a": [1, 2]}, {"b": [3'); + expect(result).to.deep.equal([{ a: [1, 2] }, { b: [3] }]); + }); + + it('rejects invalid input types', function () { + expect(function () { parsePartialJSON(123); }).to.throw(TypeError); + expect(function () { parsePartialJSON(null); }).to.throw(TypeError); + }); + + it('handles complete Infinity and NaN', function () { + expect(parsePartialJSON('Infinity')).to.equal(Infinity); + expect(parsePartialJSON('-Infinity')).to.equal(-Infinity); + expect(Number.isNaN(parsePartialJSON('NaN'))).to.equal(true); + }); + }); }); describe('MessageMaterializer', function () {