diff --git a/examples/scratch/.ice-rest-catalog.yaml b/examples/scratch/.ice-rest-catalog.yaml index 51fe0e2..c3898a6 100644 --- a/examples/scratch/.ice-rest-catalog.yaml +++ b/examples/scratch/.ice-rest-catalog.yaml @@ -15,6 +15,8 @@ s3: bearerTokens: - value: foo +maintenanceSchedule: "every 1 minutes" +orphanFileExpirationDays: 1 anonymousAccess: enabled: true accessConfig: {} diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java index feaab56..588981a 100644 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java @@ -359,7 +359,10 @@ private void initializeMaintenanceScheduler(Catalog catalog, Config config) { try { MaintenanceScheduler scheduler = new MaintenanceScheduler( - catalog, config.maintenanceSchedule(), config.snapshotTTLInDays()); + catalog, + config.maintenanceSchedule(), + config.snapshotTTLInDays(), + config.orphanFileExpirationDays()); scheduler.startScheduledMaintenance(); logger.info( "Maintenance scheduler initialized with schedule: {}", config.maintenanceSchedule()); diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/Config.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/Config.java index 21012f5..2923ab3 100644 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/Config.java +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/Config.java @@ -53,6 +53,7 @@ public record Config( "Maintenance schedule in https://github.com/shyiko/skedule?tab=readme-ov-file#format format, e.g. \"every day 00:00\". Empty schedule disables automatic maintenance (default)") String maintenanceSchedule, @JsonPropertyDescription("TTL for snapshots in days.") int snapshotTTLInDays, + @JsonPropertyDescription("TTL for orphan files in days.") int orphanFileExpirationDays, @JsonPropertyDescription( "(experimental) Extra properties to include in loadTable REST response.") Map loadTableProperties, @@ -63,6 +64,7 @@ public record Config( private static final String DEFAULT_ADDR = "0.0.0.0:5000"; private static final String DEFAULT_DEBUG_ADDR = "0.0.0.0:5001"; + private static final int DEFAULT_ORPHAN_FILE_EXPIRATION_DAYS = 10; @JsonCreator public Config( @@ -77,6 +79,7 @@ public Config( AnonymousAccess anonymousAccess, String maintenanceSchedule, int snapshotTTLInDays, + int orphanFileExpirationDays, Map loadTableProperties, @JsonProperty("iceberg") Map icebergProperties) { this.addr = Strings.orDefault(addr, DEFAULT_ADDR); @@ -91,6 +94,10 @@ public Config( Objects.requireNonNullElse(anonymousAccess, new AnonymousAccess(false, null)); this.maintenanceSchedule = maintenanceSchedule; this.snapshotTTLInDays = snapshotTTLInDays; + this.orphanFileExpirationDays = + orphanFileExpirationDays == 0 + ? DEFAULT_ORPHAN_FILE_EXPIRATION_DAYS + : orphanFileExpirationDays; this.loadTableProperties = Objects.requireNonNullElse(loadTableProperties, Map.of()); this.icebergProperties = Objects.requireNonNullElse(icebergProperties, Map.of()); } diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java index 954fd66..00f9103 100644 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java @@ -22,6 +22,7 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NotFoundException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,13 +37,16 @@ public class MaintenanceScheduler { private ScheduledFuture currentTask; private final Integer snapshotExpirationDays; + private final Integer orphanFileExpirationDays; - public MaintenanceScheduler(Catalog catalog, String schedule, int snapshotExpirationDays) { + public MaintenanceScheduler( + Catalog catalog, String schedule, int snapshotExpirationDays, int orphanFileExpirationDays) { this.catalog = catalog; this.executor = new ScheduledThreadPoolExecutor(1); ((ScheduledThreadPoolExecutor) executor).setRemoveOnCancelPolicy(true); this.schedule = Schedule.parse(schedule); this.snapshotExpirationDays = snapshotExpirationDays; + this.orphanFileExpirationDays = orphanFileExpirationDays; } public void startScheduledMaintenance() { @@ -109,8 +113,15 @@ public void performMaintenance() { for (TableIdentifier tableIdent : tables) { long olderThanMillis = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(snapshotExpirationDays); - Table table = catalog.loadTable(tableIdent); + // This throws a Location does not exist error. + Table table = null; + try { + table = catalog.loadTable(tableIdent); + } catch (NotFoundException ne) { + logger.warn("Table {} location not found, skipping maintenance", tableIdent, ne); + continue; + } // Check if table has any snapshots before performing maintenance if (table.currentSnapshot() == null) { logger.warn("Table {} has no snapshots, skipping maintenance", tableIdent); @@ -119,6 +130,24 @@ public void performMaintenance() { table.rewriteManifests().rewriteIf(manifest -> true).commit(); table.expireSnapshots().expireOlderThan(olderThanMillis).commit(); + + long orphanCutOffMillis = + System.currentTimeMillis() - TimeUnit.DAYS.toMillis(orphanFileExpirationDays); + // Remove orphans only for S3-based tables + String tableLocation = table.location(); + if (tableLocation != null && tableLocation.startsWith("s3://")) { + OrphanFileScanner orphanFileScanner = new OrphanFileScanner(table); + try { + orphanFileScanner.removeOrphanedFiles(orphanCutOffMillis, false); + } catch (Exception e) { + logger.warn("Failed to remove orphan files for table {}", tableIdent, e); + } + } else { + logger.debug( + "Skipping orphan file removal for non-S3 table: {} (location: {})", + tableIdent, + tableLocation); + } } } logger.info("Maintenance operations completed for catalog: {}", catalog.name()); diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/OrphanFileScanner.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/OrphanFileScanner.java new file mode 100644 index 0000000..32fcd7b --- /dev/null +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/OrphanFileScanner.java @@ -0,0 +1,170 @@ +/* + * Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package com.altinity.ice.rest.catalog.internal.maintenance; + +import com.altinity.ice.internal.iceberg.io.SchemeFileIO; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.stream.Collectors; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.FileInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OrphanFileScanner { + private static final Logger logger = LoggerFactory.getLogger(OrphanFileScanner.class); + private final Table table; + + public OrphanFileScanner(Table table) { + this.table = table; + } + + private Set getAllKnownFiles() throws IOException { + Set knownFiles = new HashSet<>(); + + for (Snapshot snapshot : table.snapshots()) { + if (snapshot.manifestListLocation() != null) { + knownFiles.add(snapshot.manifestListLocation()); + } + + FileIO io = table.io(); + + TableOperations ops = ((BaseTable) table).operations(); + TableMetadata meta = ops.current(); + + String currentMetadataFile = meta.metadataFileLocation(); + // Current metadata json file + knownFiles.add(currentMetadataFile); + + // All the previous metadata JSON(is there a chance there might be + // json files that are not physically present?. + for (TableMetadata.MetadataLogEntry previousFile : meta.previousFiles()) { + knownFiles.add(previousFile.file()); + } + + for (ManifestFile manifest : snapshot.dataManifests(io)) { + knownFiles.add(manifest.path()); + + // Add data files inside each manifest + try (CloseableIterable files = ManifestFiles.read(manifest, table.io())) { + for (DataFile dataFile : files) { + knownFiles.add(dataFile.path().toString()); + } + } catch (Exception e) { + throw e; + } + } + } + + return knownFiles; + } + + public Set findOrphanedFiles(String location, long olderThanMillis) throws IOException { + Set knownFiles = getAllKnownFiles(); + + FileIO tableIO = table.io(); + + SchemeFileIO schemeFileIO; + if (tableIO instanceof SchemeFileIO) { + schemeFileIO = (SchemeFileIO) tableIO; + } else { + throw new UnsupportedOperationException("SchemeFileIO is required for S3 locations"); + } + + Set allFiles = new HashSet<>(); + + Iterable fileInfos = schemeFileIO.listPrefix(location); + for (FileInfo fileInfo : fileInfos) { + if (fileInfo.createdAtMillis() < olderThanMillis) { + allFiles.add(fileInfo.location()); + } + } + + allFiles.removeAll(knownFiles); + + return allFiles; + } + + public void removeOrphanedFiles(long olderThanMillis, boolean dryRun) throws IOException { + String location = table.location(); + logger.info("Looking for Orphaned files in location {}", location); + Set orphanedFiles = findOrphanedFiles(location, olderThanMillis); + + logger.info("Found {} orphaned files at {}!", orphanedFiles.size(), location); + // log all the orphaned files + orphanedFiles.forEach(f -> logger.info("Orphaned file: {}", f)); + + if (orphanedFiles.isEmpty()) { + logger.info("No orphaned files found at {}!", location); + return; + } + + if (dryRun) { + logger.info( + "(Dry Run) Would delete {} orphaned files at {}!", orphanedFiles.size(), location); + orphanedFiles.forEach(f -> logger.info("Orphaned file: {}", f)); + } else { + + int numThreads = Math.min(8, orphanedFiles.size()); + + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + List> futures = + orphanedFiles.stream() + .map( + file -> + executor.submit( + () -> { + try { + table.io().deleteFile(file); + return file; + } catch (Exception e) { + logger.warn("Failed to delete file {}", file, e); + return null; + } + })) + .collect(Collectors.toList()); + + executor.shutdown(); + + List deletedFiles = new ArrayList<>(); + for (Future future : futures) { + try { + String result = future.get(); + if (result != null) { + deletedFiles.add(result); + } + } catch (Exception e) { + logger.error("Error during file deletion", e); + return; + } + } + + logger.info("Deleted {} orphaned files at {}!", deletedFiles.size(), location); + if (!deletedFiles.isEmpty()) { + deletedFiles.forEach(f -> logger.info("Deleted: {}", f)); + } + } + } +}