Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/cluster-support.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"chkit": patch
"@chkit/core": patch
---

Add ClickHouse cluster support. Set `clickhouse.cluster` in your config to run all generated DDL `ON CLUSTER <name>` and store the migration journal in a replicated engine — for self-managed multi-node clusters. Your table engines are passed through unchanged (declare `ReplicatedMergeTree` yourself). Leave `cluster` unset for single-node, ClickHouse Cloud, or ObsessionDB, where replication is automatic and `ON CLUSTER` is unnecessary.
22 changes: 22 additions & 0 deletions apps/docs/src/content/docs/configuration/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,28 @@ export default defineConfig({
})
```

## Cluster mode (`ON CLUSTER`)

For self-managed multi-node ClickHouse clusters, set `clickhouse.cluster` to the cluster name from your server's `remote_servers` config:

```ts
clickhouse: {
url: process.env.CLICKHOUSE_URL ?? 'http://localhost:9000',
password: process.env.CLICKHOUSE_PASSWORD ?? '',
database: 'default',
cluster: 'my_cluster',
},
```

When `cluster` is set, chkit:

- emits every generated DDL statement with an `ON CLUSTER <name>` clause, so `generate` bakes it into the migration files and `migrate` propagates each change to all nodes via ClickHouse's distributed DDL queue, and
- creates its migration journal (`_chkit_migrations`) as a `ReplicatedReplacingMergeTree`, keeping applied-migration history consistent across every node — so running `migrate` against a load-balanced endpoint never re-applies migrations.

chkit does **not** rewrite your table engines: declare `ReplicatedMergeTree` (or another `Replicated*`/`Shared*` variant) yourself for tables whose data should replicate. Empty-argument engines (e.g. `ENGINE = ReplicatedMergeTree`) are recommended so the server's `default_replica_path` supplies a collision-free Keeper path, which also keeps drop-and-recreate safe.

Leave `cluster` unset for single-node servers, ClickHouse Cloud, or ObsessionDB, where replication is automatic (SharedMergeTree) and `ON CLUSTER` is unnecessary. The value accepts an identifier (`my_cluster`) or a macro (`{cluster}`) when your nodes define one.

## User profile config fallback

Project-scoped commands (`generate`, `migrate`, `status`, `drift`, `check`, `codegen`, `pull`) always require a project config in the working directory.
Expand Down
8 changes: 8 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@
"test": "turbo run test",
"test:turbo": "turbo run test",
"test:env": "doppler run --project chkit --config ci -- bun run test",
"cluster:up": "docker compose -f test/cluster/docker-compose.yml up -d --wait",
"cluster:down": "docker compose -f test/cluster/docker-compose.yml down -v",
"cluster:logs": "docker compose -f test/cluster/docker-compose.yml logs -f",
"cluster:verify": "bash test/cluster/verify.sh",
"test:cluster": "bun test packages/cli/test/cluster.e2e.test.ts",
"cluster:2shard:up": "docker compose -f test/cluster/2shard/docker-compose.yml up -d --wait",
"cluster:2shard:down": "docker compose -f test/cluster/2shard/docker-compose.yml down -v",
"test:cluster:2shard": "bun test packages/cli/test/cluster-2shard.e2e.test.ts",
"fallow": "bunx fallow",
"fallow:dead-code": "bunx fallow --only dead-code",
"fallow:pr": "bunx fallow --changed-since main --summary || true",
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/commands/check/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
name: 'check',
description: 'Run policy checks for CI and release gates',
flags: [STRICT_FLAG],
async run(context): Promise<undefined | number> {

Check failure on line 26 in packages/cli/src/commands/check/command.ts

View workflow job for this annotation

GitHub Actions / verify

High CRAP score (critical)

Function 'run' has a CRAP score of 272.0 (threshold: 30.0). • Severity: critical • Cyclomatic: 16 • Cognitive: 13 • CRAP: 272.0 (threshold: 30.0) • Lines: 99 CRAP combines complexity with coverage: high CRAP means changes here carry high risk. Consider adding tests, simplifying the function, or both.
const { flags, config, configPath, pluginRuntime, pluginContext } = context
const f = typedFlags(flags, [...GLOBAL_FLAGS, STRICT_FLAG] as const)
const strict = f['--strict'] === true
Expand All @@ -39,7 +39,7 @@
const db = pluginContext.executor
const database = config.clickhouse?.database

const journalStore = createJournalStore(db)
const journalStore = createJournalStore(db, config.clickhouse?.cluster)
const files = await listMigrations(migrationsDir)
const journal = await journalStore.readJournal()
const databaseMissing = journalStore.databaseMissing
Expand Down
6 changes: 5 additions & 1 deletion packages/cli/src/commands/generate/command.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { generateArtifacts } from '@chkit/codegen'
import { ChxValidationError, planDiff } from '@chkit/core'
import { applyOnClusterToPlan, ChxValidationError, planDiff } from '@chkit/core'

import { defineFlags, typedFlags, type ChxPluginCommand } from '../../plugins.js'
import { resolveDirs } from '../../runtime/config.js'
Expand Down Expand Up @@ -50,7 +50,7 @@
run: cmdGenerate,
}

async function cmdGenerate(ctx: import('../../plugins.js').ChxPluginCommandContext): Promise<undefined | number> {

Check failure on line 53 in packages/cli/src/commands/generate/command.ts

View workflow job for this annotation

GitHub Actions / verify

High CRAP score (critical)

Function 'cmdGenerate' has a CRAP score of 552.0 (threshold: 30.0). • Severity: critical • Cyclomatic: 23 • Cognitive: 28 • CRAP: 552.0 (threshold: 30.0) • Lines: 163 CRAP combines complexity with coverage: high CRAP means changes here carry high risk. Consider adding tests, simplifying the function, or both.
const { flags, config, configPath, pluginRuntime } = ctx
const dirs = resolveDirs(config)
const f = typedFlags(flags, [...GLOBAL_FLAGS, ...GENERATE_FLAGS] as const)
Expand Down Expand Up @@ -163,6 +163,10 @@
}).plan
}

// Cluster mode: stamp `ON CLUSTER <name>` onto every DDL statement as a final
// post-pass, after all plan transforms (renames, plugins, scope filtering).
plan = applyOnClusterToPlan(plan, config.clickhouse?.cluster)

if (planMode) {
emitGeneratePlanOutput(plan, jsonMode, resolvedScope)
return 0
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/commands/migrate/command.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
import { mkdir, readFile } from 'node:fs/promises'
import { join } from 'node:path'

Expand Down Expand Up @@ -42,7 +42,7 @@
run: cmdMigrate,
}

async function cmdMigrate(

Check failure on line 45 in packages/cli/src/commands/migrate/command.ts

View workflow job for this annotation

GitHub Actions / verify

High CRAP score (critical)

Function 'cmdMigrate' has a CRAP score of 2352.0 (threshold: 30.0). • Severity: critical • Cyclomatic: 48 • Cognitive: 71 • CRAP: 2352.0 (threshold: 30.0) • Lines: 207 CRAP combines complexity with coverage: high CRAP means changes here carry high risk. Consider adding tests, simplifying the function, or both.
runCtx: import('../../plugins.js').ChxPluginCommandContext,
): Promise<undefined | number> {
const { flags, config, configPath, pluginRuntime, pluginContext } = runCtx
Expand All @@ -59,7 +59,7 @@
throw new Error('clickhouse config is required for migrate (journal is stored in ClickHouse)')
}
const db = pluginContext.executor
const journalStore = createJournalStore(db)
const journalStore = createJournalStore(db, config.clickhouse?.cluster)
const snapshot = await readSnapshot(metaDir)
const tableScope = resolveTableScope(tableSelector, tableKeysFromDefinitions(snapshot?.definitions ?? []))
const mode = executeRequested ? 'execute' : 'plan'
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/commands/status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
name: 'status',
description: 'Show migration status and checksum mismatch information',
flags: [],
async run(context): Promise<undefined | number> {

Check failure on line 14 in packages/cli/src/commands/status.ts

View workflow job for this annotation

GitHub Actions / verify

High CRAP score (critical)

Function 'run' has a CRAP score of 156.0 (threshold: 30.0). • Severity: critical • Cyclomatic: 12 • Cognitive: 11 • CRAP: 156.0 (threshold: 30.0) • Lines: 63 CRAP combines complexity with coverage: high CRAP means changes here carry high risk. Consider adding tests, simplifying the function, or both.
const { flags, config, pluginContext } = context
const jsonMode = flags['--json'] === true
const { migrationsDir } = resolveDirs(config)
Expand All @@ -21,7 +21,7 @@
}
const db = pluginContext.executor
const database = config.clickhouse?.database
const journalStore = createJournalStore(db)
const journalStore = createJournalStore(db, config.clickhouse?.cluster)

await mkdir(migrationsDir, { recursive: true })
const files = await listMigrations(migrationsDir)
Expand Down
25 changes: 19 additions & 6 deletions packages/cli/src/runtime/journal-store.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { isUnknownDatabaseError, type ClickHouseExecutor } from '@chkit/clickhouse'
import { onClusterClause } from '@chkit/core'

import type { MigrationJournal, MigrationJournalEntry } from './migration-store.js'
import { CLI_VERSION } from './version.js'
Expand Down Expand Up @@ -141,17 +142,29 @@ function parseOperations(value: unknown): OperationState[] {
return decoded.map((row) => operationFromTuple(row as OperationTupleRow))
}

export function createJournalStore(db: ClickHouseExecutor): JournalStore {
export function createJournalStore(db: ClickHouseExecutor, cluster?: string): JournalStore {
const journalTable = resolveJournalTableName()
debug('journal', `journal table: ${journalTable}${process.env.CHKIT_JOURNAL_TABLE ? ' (from CHKIT_JOURNAL_TABLE)' : ''}`)
const createTableSql = `CREATE TABLE IF NOT EXISTS ${journalTable} (
debug('journal', `journal table: ${journalTable}${process.env.CHKIT_JOURNAL_TABLE ? ' (from CHKIT_JOURNAL_TABLE)' : ''}${cluster ? ` (ON CLUSTER ${cluster})` : ''}`)
// In cluster mode the journal must be consistent across every node, so it uses
// a replicated engine with a no-`{shard}` Keeper path (one cluster-wide group)
// created `ON CLUSTER`. The replica id is `{shard}_{replica}` rather than bare
// `{replica}`: because the path omits `{shard}`, all nodes across all shards
// share it, so the replica name must be unique cluster-wide — and per-shard
// `{replica}` naming (the common multi-shard layout) would otherwise collide
// (REPLICA_ALREADY_EXISTS). The read path already uses SYNC REPLICA + FINAL +
// sequential consistency. Single-node/Cloud keeps the plain engine unchanged.
const onCluster = onClusterClause(cluster)
const journalEngine = cluster
? "ReplicatedReplacingMergeTree('/clickhouse/tables/{database}/{table}', '{shard}_{replica}', applied_at)"
: 'ReplacingMergeTree(applied_at)'
const createTableSql = `CREATE TABLE IF NOT EXISTS ${journalTable}${onCluster} (
name String,
applied_at DateTime64(3, 'UTC'),
checksum String,
chkit_version String,
migration_completed Bool DEFAULT true,
operations ${OPERATIONS_TUPLE_TYPE} DEFAULT []
) ENGINE = ReplacingMergeTree(applied_at)
) ENGINE = ${journalEngine}
ORDER BY (name)
SETTINGS index_granularity = 1`

Expand Down Expand Up @@ -205,10 +218,10 @@ SETTINGS index_granularity = 1`
// of the new chkit. ALTER ADD COLUMN IF NOT EXISTS is a metadata op,
// no data rewrite.
await db.command(
`ALTER TABLE ${journalTable} ADD COLUMN IF NOT EXISTS migration_completed Bool DEFAULT true`,
`ALTER TABLE ${journalTable}${onCluster} ADD COLUMN IF NOT EXISTS migration_completed Bool DEFAULT true`,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reject existing local journals in cluster mode

When clickhouse.cluster is added to a project that already has the old local _chkit_migrations table, this upgrade path only runs ALTER ... ON CLUSTER and never creates or converts the table to the new replicated engine. That leaves the journal non-replicated (or the distributed ALTER fails on replicas where the old table is absent), so status/migrate through another cluster node can miss applied migrations and replay them; please detect a pre-existing non-ReplicatedReplacingMergeTree journal and fail with migration instructions or migrate it before marking the store bootstrapped.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 4554f90. In cluster mode, ensureTable now checks the existing journal's engine first and fails fast with migration guidance (drop ON CLUSTER, or set CHKIT_JOURNAL_TABLE) if it isn't a replicated engine — before any ON CLUSTER ALTERs run. Added a cluster e2e case covering a pre-existing non-replicated journal.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up: we've intentionally reverted this guard (d2223ce). It only protected the single-node-journal → enable-cluster migration path, which can't occur yet — chkit has no cluster users, and new adopters create the journal as ReplicatedReplacingMergeTree from the first migrate. Rather than carry a probe + error path for a scenario that doesn't exist, we'll design a proper migration if/when a real single-node → cluster upgrade comes up.

)
await db.command(
`ALTER TABLE ${journalTable} ADD COLUMN IF NOT EXISTS operations ${OPERATIONS_TUPLE_TYPE} DEFAULT []`,
`ALTER TABLE ${journalTable}${onCluster} ADD COLUMN IF NOT EXISTS operations ${OPERATIONS_TUPLE_TYPE} DEFAULT []`,
)
}

Expand Down
98 changes: 98 additions & 0 deletions packages/cli/test/cluster-2shard.e2e.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import { describe, expect, test } from 'bun:test'
import { mkdtemp, writeFile } from 'node:fs/promises'
import { tmpdir } from 'node:os'
import { join } from 'node:path'

import {
CORE_ENTRY,
createJournalTableName,
createLiveExecutor,
createPrefix,
formatTestDiagnostic,
runCli,
waitForTable,
} from '../src/test/e2e-testkit.js'

// MULTI-SHARD cluster-mode e2e against test/cluster/2shard (2 shards x 2 replicas,
// replicas named per-shard so the journal's replica id must be unique cluster-wide).
//
// This is the topology that exposes a no-`{shard}` journal path: with bare
// `{replica}` the journal CREATE collides (REPLICA_ALREADY_EXISTS) on the second
// shard; chkit uses `{shard}_{replica}` so one consistent journal spans all shards.
//
// Outside src/, excluded from default CI. Run via `bun run test:cluster:2shard`
// after `bun run cluster:2shard:up`. Hard-fails (never skips) if unreachable.
const NODES = (
process.env.CHKIT_CLUSTER_2S_NODES ??
'http://localhost:8133,http://localhost:8134,http://localhost:8135,http://localhost:8136'
).split(',')
const CLUSTER = process.env.CHKIT_CLUSTER_2S ?? 'test_cluster_2s'
const USER = process.env.CLICKHOUSE_USER ?? 'default'
const PASSWORD = process.env.CLICKHOUSE_PASSWORD ?? 'clusterpass'
const DATABASE = process.env.CLICKHOUSE_DB ?? 'default'

// NODES[0] is a shard-1 node (primary endpoint); NODES[2] is a shard-2 node.
const SHARD1_ENDPOINT = NODES[0] as string
const SHARD2_ENDPOINT = NODES[2] as string

function executorFor(url: string) {
return createLiveExecutor({ clickhouseUrl: url, clickhouseUser: USER, clickhousePassword: PASSWORD, clickhouseDatabase: DATABASE })
}

function configFor(schemaPath: string, dir: string, url: string): string {
return `export default {
schema: '${schemaPath}',
outDir: '${join(dir, 'chkit')}',
migrationsDir: '${join(dir, 'chkit', 'migrations')}',
metaDir: '${join(dir, 'chkit', 'meta')}',
clickhouse: { url: '${url}', username: '${USER}', password: '${PASSWORD}', database: '${DATABASE}', cluster: '${CLUSTER}' },
}
`
}

describe('chkit cluster mode (ON CLUSTER) — multi-shard journal', () => {
test('journal forms one consistent group across all shards', async () => {
const table = `${createPrefix('twoshard')}events`
const journalTable = createJournalTableName('twoshard')
const cliEnv = { CHKIT_JOURNAL_TABLE: journalTable }

const dir = await mkdtemp(join(tmpdir(), 'chkit-cluster-2s-e2e-'))
const schemaPath = join(dir, 'schema.ts')
const configS1 = join(dir, 'clickhouse.config.ts')
const configS2 = join(dir, 'clickhouse.shard2.ts')
await writeFile(
schemaPath,
`import { schema, table } from '${CORE_ENTRY}'\n\nexport default schema(table({\n database: '${DATABASE}', name: '${table}', engine: 'ReplicatedMergeTree',\n columns: [{ name: 'id', type: 'UInt64' }], primaryKey: ['id'], orderBy: ['id'],\n}))\n`,
'utf8',
)
await writeFile(configS1, configFor(schemaPath, dir, SHARD1_ENDPOINT), 'utf8')
await writeFile(configS2, configFor(schemaPath, dir, SHARD2_ENDPOINT), 'utf8')

const generate = runCli(dir, ['generate', '--config', configS1, '--name', 'init', '--json'], cliEnv)
expect(generate.exitCode, formatTestDiagnostic('generate', generate)).toBe(0)
const migrate = runCli(dir, ['migrate', '--config', configS1, '--execute', '--json'], cliEnv)
expect(migrate.exitCode, formatTestDiagnostic('migrate', migrate)).toBe(0)

// The journal (replicated) and the table must exist on EVERY node across BOTH
// shards — with bare `{replica}` the journal would be missing on shard-1 nodes.
const executors = NODES.map(executorFor)
for (const node of executors) {
await waitForTable(node, DATABASE, journalTable)
await waitForTable(node, DATABASE, table)
const rows = await node.query<{ engine: string }>(
`SELECT engine FROM system.tables WHERE database = '${DATABASE}' AND name = '${journalTable}'`,
)
expect(rows[0]?.engine).toBe('ReplicatedReplacingMergeTree')
}

// Cross-shard: chkit run against a shard-2 endpoint sees the journal written
// through the shard-1 endpoint — nothing pending.
const planOnShard2 = runCli(dir, ['migrate', '--config', configS2, '--json'], cliEnv)
expect(planOnShard2.exitCode, formatTestDiagnostic('plan on shard2', planOnShard2)).toBe(0)
expect((JSON.parse(planOnShard2.stdout) as { pending: string[] }).pending).toEqual([])

const cleanup = executors[0] as ReturnType<typeof executorFor>
await cleanup.command(`DROP TABLE IF EXISTS ${DATABASE}.${table} ON CLUSTER '${CLUSTER}' SYNC`)
await cleanup.command(`DROP TABLE IF EXISTS ${DATABASE}.\`${journalTable}\` ON CLUSTER '${CLUSTER}' SYNC`)
}, 90_000)
})
Loading
Loading