diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java index 9ac6fb08e2..49eade5507 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java @@ -174,24 +174,6 @@ public class AmoroManagementConf { .defaultValue(Duration.ofHours(1)) .withDescription("Interval for expiring snapshots."); - public static final ConfigOption CLEAN_ORPHAN_FILES_ENABLED = - ConfigOptions.key("clean-orphan-files.enabled") - .booleanType() - .defaultValue(true) - .withDescription("Enable orphan files cleaning."); - - public static final ConfigOption CLEAN_ORPHAN_FILES_THREAD_COUNT = - ConfigOptions.key("clean-orphan-files.thread-count") - .intType() - .defaultValue(10) - .withDescription("The number of threads used for orphan files cleaning."); - - public static final ConfigOption CLEAN_ORPHAN_FILES_INTERVAL = - ConfigOptions.key("clean-orphan-files.interval") - .durationType() - .defaultValue(Duration.ofDays(1)) - .withDescription("Interval for cleaning orphan files."); - public static final ConfigOption CLEAN_DANGLING_DELETE_FILES_ENABLED = ConfigOptions.key("clean-dangling-delete-files.enabled") .booleanType() diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConfValidator.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConfValidator.java index 1e78520f07..627b577d68 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConfValidator.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConfValidator.java @@ -86,9 +86,6 @@ public static void validateConfig(Configurations configurations) { validateThreadCount(configurations, AmoroManagementConf.EXPIRE_SNAPSHOTS_THREAD_COUNT); } - if (configurations.getBoolean(AmoroManagementConf.CLEAN_ORPHAN_FILES_ENABLED)) { - validateThreadCount(configurations, AmoroManagementConf.CLEAN_ORPHAN_FILES_THREAD_COUNT); - } if (configurations.getBoolean(AmoroManagementConf.SYNC_HIVE_TABLES_ENABLED)) { validateThreadCount(configurations, AmoroManagementConf.SYNC_HIVE_TABLES_THREAD_COUNT); } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java index 0be364523a..d174e559ef 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java @@ -290,7 +290,6 @@ public void startOptimizingService() throws Exception { addHandlerChain(optimizingService.getTableRuntimeHandler()); addHandlerChain(processService.getTableHandlerChain()); addHandlerChain(InlineTableExecutors.getInstance().getDataExpiringExecutor()); - addHandlerChain(InlineTableExecutors.getInstance().getOrphanFilesCleaningExecutor()); addHandlerChain(InlineTableExecutors.getInstance().getDanglingDeleteFilesCleaningExecutor()); addHandlerChain(InlineTableExecutors.getInstance().getOptimizingCommitExecutor()); addHandlerChain(InlineTableExecutors.getInstance().getProcessDataExpiringExecutor()); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java index 18de241db8..1f78134764 100755 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java @@ -57,6 +57,14 @@ public class IcebergProcessFactory implements ProcessFactory { .durationType() .defaultValue(Duration.ofHours(1)); + public static final ConfigOption ORPHAN_FILES_CLEANING_ENABLED = + ConfigOptions.key("clean-orphan-files.enabled").booleanType().defaultValue(true); + + public static final ConfigOption ORPHAN_FILES_CLEANING_INTERVAL = + ConfigOptions.key("clean-orphan-files.interval") + .durationType() + .defaultValue(Duration.ofDays(1)); + private ExecuteEngine localEngine; private final Map actions = Maps.newHashMap(); private final List formats = @@ -91,7 +99,10 @@ public Optional trigger(TableRuntime tableRuntime, Action action) if (IcebergActions.EXPIRE_SNAPSHOTS.equals(action)) { return triggerExpireSnapshot(tableRuntime); + } else if (IcebergActions.DELETE_ORPHANS.equals(action)) { + return triggerCleanOrphans(tableRuntime); } + return Optional.empty(); } @@ -113,6 +124,12 @@ public void open(Map properties) { this.actions.put( IcebergActions.EXPIRE_SNAPSHOTS, ProcessTriggerStrategy.triggerAtFixRate(interval)); } + + if (configs.getBoolean(ORPHAN_FILES_CLEANING_ENABLED)) { + Duration interval = configs.getDuration(ORPHAN_FILES_CLEANING_INTERVAL); + this.actions.put( + IcebergActions.DELETE_ORPHANS, ProcessTriggerStrategy.triggerAtFixRate(interval)); + } } private Optional triggerExpireSnapshot(TableRuntime tableRuntime) { @@ -130,6 +147,21 @@ private Optional triggerExpireSnapshot(TableRuntime tableRuntime) return Optional.of(new SnapshotsExpiringProcess(tableRuntime, localEngine)); } + private Optional triggerCleanOrphans(TableRuntime tableRuntime) { + if (localEngine == null || !tableRuntime.getTableConfiguration().isCleanOrphanEnabled()) { + return Optional.empty(); + } + + long lastExecuteTime = + tableRuntime.getState(DefaultTableRuntime.CLEANUP_STATE_KEY).getLastOrphanFilesCleanTime(); + ProcessTriggerStrategy strategy = actions.get(IcebergActions.DELETE_ORPHANS); + if (System.currentTimeMillis() - lastExecuteTime < strategy.getTriggerInterval().toMillis()) { + return Optional.empty(); + } + + return Optional.of(new OrphanFilesCleaningProcess(tableRuntime, localEngine)); + } + @Override public void close() {} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/OrphanFilesCleaningProcess.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/OrphanFilesCleaningProcess.java new file mode 100644 index 0000000000..22e4c05ee8 --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/OrphanFilesCleaningProcess.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.process.iceberg; + +import org.apache.amoro.Action; +import org.apache.amoro.AmoroTable; +import org.apache.amoro.IcebergActions; +import org.apache.amoro.TableRuntime; +import org.apache.amoro.maintainer.TableMaintainer; +import org.apache.amoro.process.ExecuteEngine; +import org.apache.amoro.process.LocalProcess; +import org.apache.amoro.process.TableProcess; +import org.apache.amoro.server.optimizing.maintainer.TableMaintainerFactory; +import org.apache.amoro.server.table.DefaultTableRuntime; +import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** Local table process for cleaning Iceberg orphan files. */ +public class OrphanFilesCleaningProcess extends TableProcess implements LocalProcess { + + private static final Logger LOG = LoggerFactory.getLogger(OrphanFilesCleaningProcess.class); + + public OrphanFilesCleaningProcess(TableRuntime tableRuntime, ExecuteEngine engine) { + super(tableRuntime, engine); + } + + @Override + public String tag() { + return getAction().getName().toLowerCase(); + } + + @Override + public void run() { + try { + AmoroTable amoroTable = tableRuntime.loadTable(); + TableMaintainer tableMaintainer = TableMaintainerFactory.create(amoroTable, tableRuntime); + tableMaintainer.cleanOrphanFiles(); + tableRuntime.updateState( + DefaultTableRuntime.CLEANUP_STATE_KEY, + cleanUp -> cleanUp.setLastOrphanFilesCleanTime(System.currentTimeMillis())); + } catch (Throwable t) { + LOG.error("Failed to clean orphan files for table {}", tableRuntime.getTableIdentifier(), t); + } + } + + @Override + public Action getAction() { + return IcebergActions.DELETE_ORPHANS; + } + + @Override + public Map getProcessParameters() { + return Maps.newHashMap(); + } + + @Override + public Map getSummary() { + return Maps.newHashMap(); + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java index 173c0259e2..4e5b4beb35 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java @@ -28,7 +28,6 @@ public class InlineTableExecutors { private static final InlineTableExecutors instance = new InlineTableExecutors(); private TableRuntimeRefreshExecutor tableRefreshingExecutor; - private OrphanFilesCleaningExecutor orphanFilesCleaningExecutor; private DanglingDeleteFilesCleaningExecutor danglingDeleteFilesCleaningExecutor; private BlockerExpiringExecutor blockerExpiringExecutor; private OptimizingCommitExecutor optimizingCommitExecutor; @@ -42,13 +41,6 @@ public static InlineTableExecutors getInstance() { } public void setup(TableService tableService, Configurations conf) { - if (conf.getBoolean(AmoroManagementConf.CLEAN_ORPHAN_FILES_ENABLED)) { - this.orphanFilesCleaningExecutor = - new OrphanFilesCleaningExecutor( - tableService, - conf.getInteger(AmoroManagementConf.CLEAN_ORPHAN_FILES_THREAD_COUNT), - conf.get(AmoroManagementConf.CLEAN_ORPHAN_FILES_INTERVAL)); - } if (conf.getBoolean(AmoroManagementConf.CLEAN_DANGLING_DELETE_FILES_ENABLED)) { this.danglingDeleteFilesCleaningExecutor = new DanglingDeleteFilesCleaningExecutor( @@ -108,10 +100,6 @@ public TableRuntimeRefreshExecutor getTableRefreshingExecutor() { return tableRefreshingExecutor; } - public OrphanFilesCleaningExecutor getOrphanFilesCleaningExecutor() { - return orphanFilesCleaningExecutor; - } - public DanglingDeleteFilesCleaningExecutor getDanglingDeleteFilesCleaningExecutor() { return danglingDeleteFilesCleaningExecutor; } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java deleted file mode 100644 index 19db0f1929..0000000000 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.amoro.server.scheduler.inline; - -import org.apache.amoro.AmoroTable; -import org.apache.amoro.TableRuntime; -import org.apache.amoro.config.TableConfiguration; -import org.apache.amoro.maintainer.TableMaintainer; -import org.apache.amoro.server.optimizing.maintainer.TableMaintainers; -import org.apache.amoro.server.scheduler.PeriodicTableScheduler; -import org.apache.amoro.server.table.TableService; -import org.apache.amoro.server.table.cleanup.CleanupOperation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.time.Duration; -import java.util.concurrent.ThreadLocalRandom; - -public class OrphanFilesCleaningExecutor extends PeriodicTableScheduler { - private static final Logger LOG = LoggerFactory.getLogger(OrphanFilesCleaningExecutor.class); - private final Duration interval; - - public OrphanFilesCleaningExecutor(TableService tableService, int poolSize, Duration interval) { - super(tableService, poolSize); - this.interval = interval; - } - - @Override - protected long getNextExecutingTime(TableRuntime tableRuntime) { - return interval.toMillis(); - } - - @Override - protected boolean shouldExecute(Long lastCleanupEndTime) { - return System.currentTimeMillis() - lastCleanupEndTime >= interval.toMillis(); - } - - @Override - protected CleanupOperation getCleanupOperation() { - return CleanupOperation.ORPHAN_FILES_CLEANING; - } - - @Override - protected boolean enabled(TableRuntime tableRuntime) { - return tableRuntime.getTableConfiguration().isCleanOrphanEnabled(); - } - - @Override - public void handleConfigChanged(TableRuntime tableRuntime, TableConfiguration originalConfig) { - scheduleIfNecessary(tableRuntime, getStartDelay()); - } - - @Override - protected long getExecutorDelay() { - return ThreadLocalRandom.current().nextLong(interval.toMillis()); - } - - @Override - public void execute(TableRuntime tableRuntime) { - try { - LOG.info("{} start cleaning orphan files", tableRuntime.getTableIdentifier()); - AmoroTable amoroTable = loadTable(tableRuntime); - TableMaintainer tableMaintainer = TableMaintainers.create(amoroTable, tableRuntime); - tableMaintainer.cleanOrphanFiles(); - } catch (Throwable t) { - LOG.error("{} failed to clean orphan file", tableRuntime.getTableIdentifier(), t); - } - } -} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java index 9b4b56a95c..f4a4ef73e7 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java @@ -342,8 +342,6 @@ public void beginProcess(OptimizingProcess optimizingProcess) { public long getLastCleanTime(CleanupOperation operation) { TableRuntimeCleanupState state = store().getState(CLEANUP_STATE_KEY); switch (operation) { - case ORPHAN_FILES_CLEANING: - return state.getLastOrphanFilesCleanTime(); case DANGLING_DELETE_FILES_CLEANING: return state.getLastDanglingDeleteFilesCleanTime(); case DATA_EXPIRING: @@ -362,9 +360,6 @@ public void updateLastCleanTime(CleanupOperation operation, long time) { CLEANUP_STATE_KEY, state -> { switch (operation) { - case ORPHAN_FILES_CLEANING: - state.setLastOrphanFilesCleanTime(time); - break; case DANGLING_DELETE_FILES_CLEANING: state.setLastDanglingDeleteFilesCleanTime(time); break; diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java index 10afefe635..b6597db825 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java @@ -21,7 +21,6 @@ /** Table cleanup operation enum. Defines different operation types for table cleanup tasks. */ public enum CleanupOperation { DANGLING_DELETE_FILES_CLEANING, - ORPHAN_FILES_CLEANING, DATA_EXPIRING, SNAPSHOTS_EXPIRING, // NONE indicates operation types where no cleanup process records are diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java index f65ae387ca..639506ea9c 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java @@ -28,8 +28,9 @@ public long getLastOrphanFilesCleanTime() { return lastOrphanFilesCleanTime; } - public void setLastOrphanFilesCleanTime(long lastOrphanFilesCleanTime) { + public TableRuntimeCleanupState setLastOrphanFilesCleanTime(long lastOrphanFilesCleanTime) { this.lastOrphanFilesCleanTime = lastOrphanFilesCleanTime; + return this; } public long getLastDanglingDeleteFilesCleanTime() { diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java b/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java index 93fd182d39..916765d263 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java @@ -369,7 +369,6 @@ private String getAmsConfig() { + " refresh-table-thread-count: 10\n" + " refresh-table-interval: 60000 #1min\n" + " expire-table-thread-count: 10\n" - + " clean-orphan-file-thread-count: 10\n" + " sync-hive-tables-thread-count: 10\n" + "\n" + " thrift-server:\n" diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConfValidator.java b/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConfValidator.java index 8e52f779b8..532bc18881 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConfValidator.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConfValidator.java @@ -135,14 +135,6 @@ public void testValidateThreadCount() { configurations.setInteger(AmoroManagementConf.EXPIRE_SNAPSHOTS_THREAD_COUNT, 10); AmoroManagementConfValidator.validateConfig(configurations); - configurations.setBoolean(AmoroManagementConf.CLEAN_ORPHAN_FILES_ENABLED, true); - configurations.setInteger(AmoroManagementConf.CLEAN_ORPHAN_FILES_THREAD_COUNT, -1); - Assert.assertThrows( - IllegalArgumentException.class, - () -> AmoroManagementConfValidator.validateConfig(configurations)); - configurations.setInteger(AmoroManagementConf.CLEAN_ORPHAN_FILES_THREAD_COUNT, 10); - AmoroManagementConfValidator.validateConfig(configurations); - configurations.setBoolean(AmoroManagementConf.SYNC_HIVE_TABLES_ENABLED, true); configurations.setInteger(AmoroManagementConf.SYNC_HIVE_TABLES_THREAD_COUNT, -1); Assert.assertThrows( diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java index 35151ce3d9..61b00b8332 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java @@ -43,104 +43,128 @@ public class TestIcebergProcessFactory { @Test public void testOpenAndSupportedActions() { + assertSupportedAction("expire-snapshots", IcebergActions.EXPIRE_SNAPSHOTS, Duration.ofHours(1)); + assertSupportedAction( + "clean-orphan-files", IcebergActions.DELETE_ORPHANS, Duration.ofHours(24)); + } + + @Test + public void testTriggerActionWhenDue() { + assertTriggerWhenDue( + "expire-snapshots", IcebergActions.EXPIRE_SNAPSHOTS, SnapshotsExpiringProcess.class, 0); + assertTriggerWhenDue( + "clean-orphan-files", IcebergActions.DELETE_ORPHANS, OrphanFilesCleaningProcess.class, 0); + } + + @Test + public void testTriggerActionNotDue() { + assertTriggerNotDue( + "expire-snapshots", IcebergActions.EXPIRE_SNAPSHOTS, System.currentTimeMillis()); + assertTriggerNotDue( + "clean-orphan-files", IcebergActions.DELETE_ORPHANS, System.currentTimeMillis()); + } + + @Test + public void testTriggerActionDisabled() { + assertTriggerDisabled("expire-snapshots", IcebergActions.EXPIRE_SNAPSHOTS, false, 0); + assertTriggerDisabled("clean-orphan-files", IcebergActions.DELETE_ORPHANS, false, 0); + } + + private void assertSupportedAction( + String configKey, org.apache.amoro.Action action, Duration interval) { IcebergProcessFactory factory = new IcebergProcessFactory(); Map properties = new HashMap<>(); - properties.put("expire-snapshots.enabled", "true"); - properties.put("expire-snapshots.interval", "1h"); + properties.put(configKey + ".enabled", "true"); + properties.put(configKey + ".interval", interval.toHours() + "h"); factory.open(properties); Map> supported = factory.supportedActions(); - Assert.assertTrue(supported.get(TableFormat.ICEBERG).contains(IcebergActions.EXPIRE_SNAPSHOTS)); - Assert.assertTrue( - supported.get(TableFormat.MIXED_ICEBERG).contains(IcebergActions.EXPIRE_SNAPSHOTS)); - Assert.assertTrue( - supported.get(TableFormat.MIXED_HIVE).contains(IcebergActions.EXPIRE_SNAPSHOTS)); - - ProcessTriggerStrategy strategy = - factory.triggerStrategy(TableFormat.ICEBERG, IcebergActions.EXPIRE_SNAPSHOTS); - Assert.assertEquals(Duration.ofHours(1), strategy.getTriggerInterval()); + Assert.assertTrue(supported.get(TableFormat.ICEBERG).contains(action)); + Assert.assertTrue(supported.get(TableFormat.MIXED_ICEBERG).contains(action)); + Assert.assertTrue(supported.get(TableFormat.MIXED_HIVE).contains(action)); + + ProcessTriggerStrategy strategy = factory.triggerStrategy(TableFormat.ICEBERG, action); + Assert.assertEquals(interval, strategy.getTriggerInterval()); } - @Test - public void testTriggerExpireSnapshotWhenDue() { + private void assertTriggerWhenDue( + String configKey, org.apache.amoro.Action action, Class processClass, long lastTime) { IcebergProcessFactory factory = new IcebergProcessFactory(); Map properties = new HashMap<>(); - properties.put("expire-snapshots.enabled", "true"); - properties.put("expire-snapshots.interval", "1h"); + properties.put(configKey + ".enabled", "true"); + properties.put(configKey + ".interval", "1h"); factory.open(properties); LocalExecutionEngine localEngine = mock(LocalExecutionEngine.class); doReturn(LocalExecutionEngine.ENGINE_NAME).when(localEngine).name(); factory.availableExecuteEngines(Arrays.asList(localEngine)); - TableConfiguration tableConfiguration = new TableConfiguration().setExpireSnapshotEnabled(true); - TableRuntimeCleanupState cleanupState = - new TableRuntimeCleanupState().setLastSnapshotsExpiringTime(0); + TableRuntime runtime = createRuntime(configKey, true, lastTime); - TableRuntime runtime = mock(TableRuntime.class); - doReturn(tableConfiguration).when(runtime).getTableConfiguration(); - doReturn(cleanupState).when(runtime).getState(DefaultTableRuntime.CLEANUP_STATE_KEY); - - Optional process = - factory.trigger(runtime, IcebergActions.EXPIRE_SNAPSHOTS); + Optional process = factory.trigger(runtime, action); Assert.assertTrue(process.isPresent()); - Assert.assertTrue(process.get() instanceof SnapshotsExpiringProcess); + Assert.assertTrue(processClass.isInstance(process.get())); Assert.assertEquals(LocalExecutionEngine.ENGINE_NAME, process.get().getExecutionEngine()); } - @Test - public void testTriggerExpireSnapshotNotDue() { + private void assertTriggerNotDue( + String configKey, org.apache.amoro.Action action, long lastTime) { IcebergProcessFactory factory = new IcebergProcessFactory(); Map properties = new HashMap<>(); - properties.put("expire-snapshots.enabled", "true"); - properties.put("expire-snapshots.interval", "1h"); + properties.put(configKey + ".enabled", "true"); + properties.put(configKey + ".interval", "1h"); factory.open(properties); factory.availableExecuteEngines(Arrays.asList(mock(LocalExecutionEngine.class))); - TableConfiguration tableConfiguration = new TableConfiguration().setExpireSnapshotEnabled(true); - long now = System.currentTimeMillis(); - TableRuntimeCleanupState cleanupState = - new TableRuntimeCleanupState().setLastSnapshotsExpiringTime(now); - - TableRuntime runtime = mock(TableRuntime.class); - doReturn(tableConfiguration).when(runtime).getTableConfiguration(); - doReturn(cleanupState).when(runtime).getState(DefaultTableRuntime.CLEANUP_STATE_KEY); + TableRuntime runtime = createRuntime(configKey, true, lastTime); - Optional process = - factory.trigger(runtime, IcebergActions.EXPIRE_SNAPSHOTS); + Optional process = factory.trigger(runtime, action); Assert.assertFalse(process.isPresent()); } - @Test - public void testTriggerExpireSnapshotDisabled() { + private void assertTriggerDisabled( + String configKey, org.apache.amoro.Action action, boolean enabled, long lastTime) { IcebergProcessFactory factory = new IcebergProcessFactory(); Map properties = new HashMap<>(); - properties.put("expire-snapshots.enabled", "true"); - properties.put("expire-snapshots.interval", "1h"); + properties.put(configKey + ".enabled", "true"); + properties.put(configKey + ".interval", "1h"); factory.open(properties); factory.availableExecuteEngines(Arrays.asList(mock(LocalExecutionEngine.class))); - TableConfiguration tableConfiguration = - new TableConfiguration().setExpireSnapshotEnabled(false); - TableRuntimeCleanupState cleanupState = - new TableRuntimeCleanupState().setLastSnapshotsExpiringTime(0); + TableRuntime runtime = createRuntime(configKey, enabled, lastTime); + + Optional process = factory.trigger(runtime, action); + + Assert.assertFalse(process.isPresent()); + } + + private TableRuntime createRuntime(String configKey, boolean enabled, long lastTime) { + TableConfiguration tableConfiguration = new TableConfiguration(); + if ("expire-snapshots".equals(configKey)) { + tableConfiguration.setExpireSnapshotEnabled(enabled); + } else if ("clean-orphan-files".equals(configKey)) { + tableConfiguration.setCleanOrphanEnabled(enabled); + } + + TableRuntimeCleanupState cleanupState = new TableRuntimeCleanupState(); + if ("expire-snapshots".equals(configKey)) { + cleanupState.setLastSnapshotsExpiringTime(lastTime); + } else if ("clean-orphan-files".equals(configKey)) { + cleanupState.setLastOrphanFilesCleanTime(lastTime); + } TableRuntime runtime = mock(TableRuntime.class); doReturn(tableConfiguration).when(runtime).getTableConfiguration(); doReturn(cleanupState).when(runtime).getState(DefaultTableRuntime.CLEANUP_STATE_KEY); - - Optional process = - factory.trigger(runtime, IcebergActions.EXPIRE_SNAPSHOTS); - - Assert.assertFalse(process.isPresent()); + return runtime; } } diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java index 50c0e9d19d..ba2a8767c6 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java @@ -31,7 +31,6 @@ class PeriodicTableSchedulerTestBase extends PeriodicTableScheduler { private final CleanupOperation cleanupOperation; private final boolean enabled; private static final long SNAPSHOTS_EXPIRING_INTERVAL = 60 * 60 * 1000L; // 1 hour - private static final long ORPHAN_FILES_CLEANING_INTERVAL = 24 * 60 * 60 * 1000L; // 1 day private static final long DANGLING_DELETE_FILES_CLEANING_INTERVAL = 24 * 60 * 60 * 1000L; private static final long DATA_EXPIRING_INTERVAL = 60 * 60 * 1000L; // 1 hour @@ -73,8 +72,6 @@ protected boolean shouldExecute(Long lastCleanupEndTime) { switch (cleanupOperation) { case SNAPSHOTS_EXPIRING: return currentTime - lastCleanupEndTime >= SNAPSHOTS_EXPIRING_INTERVAL; - case ORPHAN_FILES_CLEANING: - return currentTime - lastCleanupEndTime >= ORPHAN_FILES_CLEANING_INTERVAL; case DANGLING_DELETE_FILES_CLEANING: return currentTime - lastCleanupEndTime >= DANGLING_DELETE_FILES_CLEANING_INTERVAL; case DATA_EXPIRING: diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java index de25eac21d..fef2a3bd42 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java @@ -167,7 +167,6 @@ private PeriodicTableSchedulerTestBase createTestExecutor(CleanupOperation clean public void testShouldExecuteTaskWithNoPreviousCleanup() { List operations = Arrays.asList( - CleanupOperation.ORPHAN_FILES_CLEANING, CleanupOperation.DANGLING_DELETE_FILES_CLEANING, CleanupOperation.DATA_EXPIRING, CleanupOperation.SNAPSHOTS_EXPIRING); @@ -192,7 +191,6 @@ public void testShouldExecuteTaskWithNoPreviousCleanup() { public void testShouldNotExecuteTaskWithRecentCleanup() { List operations = Arrays.asList( - CleanupOperation.ORPHAN_FILES_CLEANING, CleanupOperation.DANGLING_DELETE_FILES_CLEANING, CleanupOperation.DATA_EXPIRING, CleanupOperation.SNAPSHOTS_EXPIRING); @@ -222,7 +220,6 @@ public void testShouldNotExecuteTaskWithRecentCleanup() { public void testShouldExecuteTaskWithOldCleanup() { List operations = Arrays.asList( - CleanupOperation.ORPHAN_FILES_CLEANING, CleanupOperation.DANGLING_DELETE_FILES_CLEANING, CleanupOperation.DATA_EXPIRING, CleanupOperation.SNAPSHOTS_EXPIRING); diff --git a/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java b/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java index 6c67a20970..ed1b26d53d 100644 --- a/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java +++ b/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java @@ -45,6 +45,11 @@ public void tearDown() { @Test public void testSubmitUsesCustomPoolByTag() throws Exception { + assertCustomPoolByTag("snapshots-expiring"); + assertCustomPoolByTag("orphan-files-cleaning"); + } + + private void assertCustomPoolByTag(String tag) throws Exception { engine = createEngineWithTtl("1h"); CountDownLatch started = new CountDownLatch(1); @@ -54,7 +59,7 @@ public void testSubmitUsesCustomPoolByTag() throws Exception { new LocalProcessTableProcess( mock(TableRuntime.class), engine, - "snapshots-expiring", + tag, () -> { threadName.set(Thread.currentThread().getName()); started.countDown(); @@ -64,7 +69,7 @@ public void testSubmitUsesCustomPoolByTag() throws Exception { Assertions.assertTrue(started.await(5, TimeUnit.SECONDS), "process should start"); Assertions.assertTrue( - threadName.get() != null && threadName.get().startsWith("local-snapshots-expiring-"), + threadName.get() != null && threadName.get().startsWith("local-" + tag + "-"), "should run in custom pool"); waitForStatus(identifier, ProcessStatus.SUCCESS, 5000); @@ -148,6 +153,7 @@ private LocalExecutionEngine createEngineWithTtl(String ttl) { Map properties = new HashMap<>(); properties.put("pool.default.thread-count", "1"); properties.put("pool.snapshots-expiring.thread-count", "1"); + properties.put("pool.orphan-files-cleaning.thread-count", "1"); properties.put("process.status.ttl", ttl); localEngine.open(properties); return localEngine; diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java index eb89c07c09..f906d4b866 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java @@ -1046,7 +1046,7 @@ public static boolean willNotRetain( .latestExpiredSeq; // only expire delete files with sequence-number less or equal to expired data file // there may be some dangling delete files, they will be cleaned by - // OrphanFileCleaningExecutor + // OrphanFilesCleaningProcess return fileEntry.getFile().dataSequenceNumber() <= seqUpperBound; } else { return true; diff --git a/charts/amoro/templates/amoro-configmap.yaml b/charts/amoro/templates/amoro-configmap.yaml index e0638059d8..d00566f2cd 100644 --- a/charts/amoro/templates/amoro-configmap.yaml +++ b/charts/amoro/templates/amoro-configmap.yaml @@ -103,10 +103,6 @@ data: enabled: true thread-count: 10 - clean-orphan-files: - enabled: true - thread-count: 10 - clean-dangling-delete-files: enabled: true thread-count: 10 diff --git a/dist/src/main/amoro-bin/conf/config.yaml b/dist/src/main/amoro-bin/conf/config.yaml index 65446fe2bb..e5fc2654a6 100644 --- a/dist/src/main/amoro-bin/conf/config.yaml +++ b/dist/src/main/amoro-bin/conf/config.yaml @@ -112,11 +112,6 @@ ams: enabled: true thread-count: 10 - clean-orphan-files: - enabled: true - thread-count: 10 - interval: 1d - clean-dangling-delete-files: enabled: true thread-count: 10 diff --git a/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml b/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml index 5c3199f320..52e0626295 100755 --- a/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml +++ b/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml @@ -22,3 +22,4 @@ execute-engines: properties: pool.default.thread-count: 10 pool.snapshots-expiring.thread-count: 10 + pool.orphan-files-cleaning.thread-count: 10 diff --git a/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml b/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml index e1455c2cf1..5825fde341 100755 --- a/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml +++ b/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml @@ -22,3 +22,5 @@ process-factories: properties: expire-snapshots.enabled: "true" expire-snapshots.interval: "1h" + clean-orphan-files.enabled: "true" + clean-orphan-files.interval: "1d" diff --git a/docs/admin-guides/deployment.md b/docs/admin-guides/deployment.md index 57f1315332..060923b3c0 100644 --- a/docs/admin-guides/deployment.md +++ b/docs/admin-guides/deployment.md @@ -247,6 +247,70 @@ scrape_configs: - targets: ['localhost:9090'] # The host and port that you configured in Amoro plugins configs file. ``` +### Configure process factories + +Process factories manage table maintenance actions like snapshot expiration and orphan file cleaning. +They are configured in `$AMORO_CONF_DIR/plugins/process-factories.yaml`. + +The configuration format is: + +```yaml +process-factories: + - name: # the unified plugin name. + enabled: # if this plugin is enabled, default is true. + priority: # plugin priority for loading order. + properties: # a map defines properties of plugin. +``` + +Currently, the `iceberg` process factory is available for Iceberg, Mixed-Iceberg, and Mixed-Hive formats: + +```yaml +process-factories: + - name: iceberg + enabled: true + priority: 100 + properties: + expire-snapshots.enabled: "true" # enable snapshots expiring + expire-snapshots.interval: "1h" # interval for expiring snapshots + clean-orphan-files.enabled: "true" # enable orphan files cleaning + clean-orphan-files.interval: "1d" # interval for cleaning orphan files +``` + +{{< hint info >}} +Process-level properties control whether an action is registered. Table-level properties (see [Table Configurations](../../user-guides/configurations/)) control whether a specific table executes the action. +{{< /hint >}} + +### Configure execute engines + +Execute engines define how maintenance processes are executed. The `local` engine runs processes in AMS thread pools. +They are configured in `$AMORO_CONF_DIR/plugins/execute-engines.yaml`. + +The configuration format is: + +```yaml +execute-engines: + - name: # the unified plugin name. + enabled: # if this engine is enabled, default is true. + priority: # engine priority for loading order. + properties: # a map defines properties of engine. +``` + +```yaml +execute-engines: + - name: local + enabled: true + priority: 100 + properties: + pool.default.thread-count: 10 # default thread pool size + pool.snapshots-expiring.thread-count: 10 # thread pool for snapshot expiration + pool.orphan-files-cleaning.thread-count: 10 # thread pool for orphan file cleaning + process.status.ttl: 4h # TTL for process status cache +``` + +{{< hint info >}} +Custom pools use the pattern `pool..thread-count`. Processes select pools via their `tag()` method, falling back to the default pool if no match. +{{< /hint >}} + ### Configure encrypted configuration items For enhanced security, AMS supports encrypted values for sensitive configuration items such as passwords within `config.yaml`. This prevents plaintext passwords and other critical information from being directly exposed in the configuration file. Currently, AMS provides built-in support for base64 decryption, and users can also implement custom decryption algorithms if needed (see [Using Customized Encryption Method for Configurations](../using-customized-encryption-method/)). diff --git a/docs/configuration/ams-config.md b/docs/configuration/ams-config.md index c00841dae7..aac5c4c42d 100644 --- a/docs/configuration/ams-config.md +++ b/docs/configuration/ams-config.md @@ -52,9 +52,6 @@ table td:last-child, table th:last-child { width: 40%; word-break: break-all; } | clean-dangling-delete-files.enabled | true | Enable dangling delete files cleaning. | | clean-dangling-delete-files.interval | 1 d | Interval for cleaning dangling delete files. | | clean-dangling-delete-files.thread-count | 10 | The number of threads used for dangling delete files cleaning. | -| clean-orphan-files.enabled | true | Enable orphan files cleaning. | -| clean-orphan-files.interval | 1 d | Interval for cleaning orphan files. | -| clean-orphan-files.thread-count | 10 | The number of threads used for orphan files cleaning. | | data-expiration.enabled | true | Enable data expiration | | data-expiration.interval | 1 d | Execute interval for data expiration | | data-expiration.thread-count | 10 | The number of threads used for data expiring |