Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions pkg/backup/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,18 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName, diffFromRe
log.Debug().Msgf("CheckSystemPartsColumnsForTables passed for %d tables", len(tablesToCheck))
}

// Fetch in-progress mutations ONCE for the whole backup. system.mutations scans all tables on
// every query, so the previous per-table GetInProgressMutations call was O(N^2) and dominated
// create time on installations with many tables. We now do a single scan and look up per table.
var allInProgressMutations map[metadata.TableTitle][]metadata.MutationMetadata
if b.cfg.ClickHouse.BackupMutations && !schemaOnly && !rbacOnly && !configsOnly && !namedCollectionsOnly {
var allInProgressMutationsErr error
allInProgressMutations, allInProgressMutationsErr = b.ch.GetInProgressMutationsBatch(ctx)
if allInProgressMutationsErr != nil {
return errors.Wrap(allInProgressMutationsErr, "b.ch.GetInProgressMutationsBatch")
}
}

var backupDataSize, backupObjectDiskSize, backupMetadataSize uint64
var metaMutex sync.Mutex
createBackupWorkingGroup, createCtx := errgroup.WithContext(ctx)
Expand Down Expand Up @@ -372,12 +384,9 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName, diffFromRe
logger.Debug().Msg("get in progress mutations list")
inProgressMutations := make([]metadata.MutationMetadata, 0)
if b.cfg.ClickHouse.BackupMutations && !schemaOnly && !rbacOnly && !configsOnly && !namedCollectionsOnly {
var inProgressMutationsErr error
inProgressMutations, inProgressMutationsErr = b.ch.GetInProgressMutations(createCtx, table.Database, table.Name)
if inProgressMutationsErr != nil {
logger.Error().Msgf("b.ch.GetInProgressMutations error: %v", inProgressMutationsErr)
return errors.Wrap(inProgressMutationsErr, "b.ch.GetInProgressMutations")
}
// looked up from the single GetInProgressMutationsBatch query above — avoids the
// O(N^2) per-table system.mutations scan.
inProgressMutations = allInProgressMutations[metadata.TableTitle{Database: table.Database, Table: table.Name}]
}
logger.Debug().Msg("create metadata")
if schemaOnly || doBackupData {
Expand Down
36 changes: 36 additions & 0 deletions pkg/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -1363,6 +1363,42 @@ func (ch *ClickHouse) GetInProgressMutations(ctx context.Context, database strin
return inProgressMutations, nil
}

// GetInProgressMutationsBatch returns all in-progress mutations across the whole server in a
// SINGLE query, keyed by metadata.TableTitle{Database, Table}. system.mutations is an expensive
// virtual table — every query against it enumerates all tables on the server — so calling
// GetInProgressMutations once per table is O(N^2) and dominates `create` wall-clock on
// installations with many tables (observed: ~240ms/call * tens of thousands of tables). Fetching
// the whole in-progress set once per backup turns that into a single O(N) scan; per-table lookup is
// then an in-memory map access.
type inProgressMutationRow struct {
Database string `ch:"database"`
Table string `ch:"table"`
MutationId string `ch:"mutation_id"`
Command string `ch:"command"`
}

// groupMutationsByTable buckets flat mutation rows into per-table lists keyed by
// metadata.TableTitle{Database, Table}. A struct key avoids the corner cases of a "database.table"
// string key (dots are legal in database/table names, so concatenation is ambiguous). Pure (no I/O)
// so it is unit-testable without a ClickHouse server.
func groupMutationsByTable(rows []inProgressMutationRow) map[metadata.TableTitle][]metadata.MutationMetadata {
result := make(map[metadata.TableTitle][]metadata.MutationMetadata, len(rows))
for _, r := range rows {
key := metadata.TableTitle{Database: r.Database, Table: r.Table}
result[key] = append(result[key], metadata.MutationMetadata{MutationId: r.MutationId, Command: r.Command})
}
return result
}

func (ch *ClickHouse) GetInProgressMutationsBatch(ctx context.Context) (map[metadata.TableTitle][]metadata.MutationMetadata, error) {
var rows []inProgressMutationRow
query := "SELECT database, table, mutation_id, command FROM system.mutations WHERE is_done=0"
Comment thread
Slach marked this conversation as resolved.
if err := ch.SelectContext(ctx, &rows, query); err != nil {
return nil, errors.Wrap(err, "can't get in progress mutations")
}
return groupMutationsByTable(rows), nil
}

func (ch *ClickHouse) ApplyMacros(ctx context.Context, s string) (string, error) {
if !strings.Contains(s, "{") {
return s, nil
Expand Down
48 changes: 48 additions & 0 deletions pkg/clickhouse/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package clickhouse
import (
"testing"

"github.com/Altinity/clickhouse-backup/v2/pkg/metadata"
"github.com/go-faster/errors"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -198,3 +199,50 @@ func TestEnrichQueryWithOnCluster(t *testing.T) {
})
}
}

// TestGroupMutationsByTable covers the per-backup batch mutation lookup that replaced the
// per-table system.mutations query (O(N^2) fix). Verifies that mutations from a single
// server-wide scan are bucketed to the correct database.table and never leak across tables.
func TestGroupMutationsByTable(t *testing.T) {
rows := []inProgressMutationRow{
{Database: "db1", Table: "t1", MutationId: "0000000001", Command: "MODIFY COLUMN a UInt64"},
{Database: "db1", Table: "t1", MutationId: "0000000002", Command: "DROP COLUMN b"},
{Database: "db1", Table: "t2", MutationId: "0000000003", Command: "MODIFY COLUMN c String"},
}

got := groupMutationsByTable(rows)

assert.Len(t, got, 2, "two distinct tables expected")
assert.Equal(t, []metadata.MutationMetadata{
{MutationId: "0000000001", Command: "MODIFY COLUMN a UInt64"},
{MutationId: "0000000002", Command: "DROP COLUMN b"},
}, got[metadata.TableTitle{Database: "db1", Table: "t1"}], "t1 must keep both of its mutations, in order")
assert.Equal(t, []metadata.MutationMetadata{
{MutationId: "0000000003", Command: "MODIFY COLUMN c String"},
}, got[metadata.TableTitle{Database: "db1", Table: "t2"}], "t2 must get only its own mutation (no cross-table leak)")
}

// TestGroupMutationsByTableDottedNames pins the reason for the metadata.TableTitle struct key:
// dots are legal in database/table names, so a "database.table" string key would collapse
// {db="a.b", table="c"} and {db="a", table="b.c"} into the same "a.b.c" bucket. The struct key
// keeps them separate.
func TestGroupMutationsByTableDottedNames(t *testing.T) {
rows := []inProgressMutationRow{
{Database: "a.b", Table: "c", MutationId: "0000000001", Command: "DROP COLUMN x"},
{Database: "a", Table: "b.c", MutationId: "0000000002", Command: "DROP COLUMN y"},
}

got := groupMutationsByTable(rows)

assert.Len(t, got, 2, "ambiguous string key would have merged these into one bucket")
assert.Equal(t, []metadata.MutationMetadata{
{MutationId: "0000000001", Command: "DROP COLUMN x"},
}, got[metadata.TableTitle{Database: "a.b", Table: "c"}])
assert.Equal(t, []metadata.MutationMetadata{
{MutationId: "0000000002", Command: "DROP COLUMN y"},
}, got[metadata.TableTitle{Database: "a", Table: "b.c"}])
}

func TestGroupMutationsByTableEmpty(t *testing.T) {
assert.Empty(t, groupMutationsByTable(nil))
}