diff --git a/pkg/backup/create.go b/pkg/backup/create.go index 4d80369d..51490015 100644 --- a/pkg/backup/create.go +++ b/pkg/backup/create.go @@ -331,6 +331,18 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName, diffFromRe log.Debug().Msgf("CheckSystemPartsColumnsForTables passed for %d tables", len(tablesToCheck)) } + // Fetch in-progress mutations ONCE for the whole backup. system.mutations scans all tables on + // every query, so the previous per-table GetInProgressMutations call was O(N^2) and dominated + // create time on installations with many tables. We now do a single scan and look up per table. + var allInProgressMutations map[metadata.TableTitle][]metadata.MutationMetadata + if b.cfg.ClickHouse.BackupMutations && !schemaOnly && !rbacOnly && !configsOnly && !namedCollectionsOnly { + var allInProgressMutationsErr error + allInProgressMutations, allInProgressMutationsErr = b.ch.GetInProgressMutationsBatch(ctx) + if allInProgressMutationsErr != nil { + return errors.Wrap(allInProgressMutationsErr, "b.ch.GetInProgressMutationsBatch") + } + } + var backupDataSize, backupObjectDiskSize, backupMetadataSize uint64 var metaMutex sync.Mutex createBackupWorkingGroup, createCtx := errgroup.WithContext(ctx) @@ -372,12 +384,9 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName, diffFromRe logger.Debug().Msg("get in progress mutations list") inProgressMutations := make([]metadata.MutationMetadata, 0) if b.cfg.ClickHouse.BackupMutations && !schemaOnly && !rbacOnly && !configsOnly && !namedCollectionsOnly { - var inProgressMutationsErr error - inProgressMutations, inProgressMutationsErr = b.ch.GetInProgressMutations(createCtx, table.Database, table.Name) - if inProgressMutationsErr != nil { - logger.Error().Msgf("b.ch.GetInProgressMutations error: %v", inProgressMutationsErr) - return errors.Wrap(inProgressMutationsErr, "b.ch.GetInProgressMutations") - } + // looked up from the single GetInProgressMutationsBatch query above — avoids the + // O(N^2) per-table system.mutations scan. + inProgressMutations = allInProgressMutations[metadata.TableTitle{Database: table.Database, Table: table.Name}] } logger.Debug().Msg("create metadata") if schemaOnly || doBackupData { diff --git a/pkg/clickhouse/clickhouse.go b/pkg/clickhouse/clickhouse.go index e615a081..1c173555 100644 --- a/pkg/clickhouse/clickhouse.go +++ b/pkg/clickhouse/clickhouse.go @@ -1363,6 +1363,42 @@ func (ch *ClickHouse) GetInProgressMutations(ctx context.Context, database strin return inProgressMutations, nil } +// GetInProgressMutationsBatch returns all in-progress mutations across the whole server in a +// SINGLE query, keyed by metadata.TableTitle{Database, Table}. system.mutations is an expensive +// virtual table — every query against it enumerates all tables on the server — so calling +// GetInProgressMutations once per table is O(N^2) and dominates `create` wall-clock on +// installations with many tables (observed: ~240ms/call * tens of thousands of tables). Fetching +// the whole in-progress set once per backup turns that into a single O(N) scan; per-table lookup is +// then an in-memory map access. +type inProgressMutationRow struct { + Database string `ch:"database"` + Table string `ch:"table"` + MutationId string `ch:"mutation_id"` + Command string `ch:"command"` +} + +// groupMutationsByTable buckets flat mutation rows into per-table lists keyed by +// metadata.TableTitle{Database, Table}. A struct key avoids the corner cases of a "database.table" +// string key (dots are legal in database/table names, so concatenation is ambiguous). Pure (no I/O) +// so it is unit-testable without a ClickHouse server. +func groupMutationsByTable(rows []inProgressMutationRow) map[metadata.TableTitle][]metadata.MutationMetadata { + result := make(map[metadata.TableTitle][]metadata.MutationMetadata, len(rows)) + for _, r := range rows { + key := metadata.TableTitle{Database: r.Database, Table: r.Table} + result[key] = append(result[key], metadata.MutationMetadata{MutationId: r.MutationId, Command: r.Command}) + } + return result +} + +func (ch *ClickHouse) GetInProgressMutationsBatch(ctx context.Context) (map[metadata.TableTitle][]metadata.MutationMetadata, error) { + var rows []inProgressMutationRow + query := "SELECT database, table, mutation_id, command FROM system.mutations WHERE is_done=0" + if err := ch.SelectContext(ctx, &rows, query); err != nil { + return nil, errors.Wrap(err, "can't get in progress mutations") + } + return groupMutationsByTable(rows), nil +} + func (ch *ClickHouse) ApplyMacros(ctx context.Context, s string) (string, error) { if !strings.Contains(s, "{") { return s, nil diff --git a/pkg/clickhouse/clickhouse_test.go b/pkg/clickhouse/clickhouse_test.go index abcbc954..85eeaab9 100644 --- a/pkg/clickhouse/clickhouse_test.go +++ b/pkg/clickhouse/clickhouse_test.go @@ -3,6 +3,7 @@ package clickhouse import ( "testing" + "github.com/Altinity/clickhouse-backup/v2/pkg/metadata" "github.com/go-faster/errors" "github.com/stretchr/testify/assert" ) @@ -198,3 +199,50 @@ func TestEnrichQueryWithOnCluster(t *testing.T) { }) } } + +// TestGroupMutationsByTable covers the per-backup batch mutation lookup that replaced the +// per-table system.mutations query (O(N^2) fix). Verifies that mutations from a single +// server-wide scan are bucketed to the correct database.table and never leak across tables. +func TestGroupMutationsByTable(t *testing.T) { + rows := []inProgressMutationRow{ + {Database: "db1", Table: "t1", MutationId: "0000000001", Command: "MODIFY COLUMN a UInt64"}, + {Database: "db1", Table: "t1", MutationId: "0000000002", Command: "DROP COLUMN b"}, + {Database: "db1", Table: "t2", MutationId: "0000000003", Command: "MODIFY COLUMN c String"}, + } + + got := groupMutationsByTable(rows) + + assert.Len(t, got, 2, "two distinct tables expected") + assert.Equal(t, []metadata.MutationMetadata{ + {MutationId: "0000000001", Command: "MODIFY COLUMN a UInt64"}, + {MutationId: "0000000002", Command: "DROP COLUMN b"}, + }, got[metadata.TableTitle{Database: "db1", Table: "t1"}], "t1 must keep both of its mutations, in order") + assert.Equal(t, []metadata.MutationMetadata{ + {MutationId: "0000000003", Command: "MODIFY COLUMN c String"}, + }, got[metadata.TableTitle{Database: "db1", Table: "t2"}], "t2 must get only its own mutation (no cross-table leak)") +} + +// TestGroupMutationsByTableDottedNames pins the reason for the metadata.TableTitle struct key: +// dots are legal in database/table names, so a "database.table" string key would collapse +// {db="a.b", table="c"} and {db="a", table="b.c"} into the same "a.b.c" bucket. The struct key +// keeps them separate. +func TestGroupMutationsByTableDottedNames(t *testing.T) { + rows := []inProgressMutationRow{ + {Database: "a.b", Table: "c", MutationId: "0000000001", Command: "DROP COLUMN x"}, + {Database: "a", Table: "b.c", MutationId: "0000000002", Command: "DROP COLUMN y"}, + } + + got := groupMutationsByTable(rows) + + assert.Len(t, got, 2, "ambiguous string key would have merged these into one bucket") + assert.Equal(t, []metadata.MutationMetadata{ + {MutationId: "0000000001", Command: "DROP COLUMN x"}, + }, got[metadata.TableTitle{Database: "a.b", Table: "c"}]) + assert.Equal(t, []metadata.MutationMetadata{ + {MutationId: "0000000002", Command: "DROP COLUMN y"}, + }, got[metadata.TableTitle{Database: "a", Table: "b.c"}]) +} + +func TestGroupMutationsByTableEmpty(t *testing.T) { + assert.Empty(t, groupMutationsByTable(nil)) +}