Skip to content

1 delete orphan files #35

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions examples/scratch/.ice-rest-catalog.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ s3:
bearerTokens:
- value: foo

maintenanceSchedule: "every 1 minutes"
orphanFileExpirationDays: 1
anonymousAccess:
enabled: true
accessConfig: {}
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,10 @@ private void initializeMaintenanceScheduler(Catalog catalog, Config config) {
try {
MaintenanceScheduler scheduler =
new MaintenanceScheduler(
catalog, config.maintenanceSchedule(), config.snapshotTTLInDays());
catalog,
config.maintenanceSchedule(),
config.snapshotTTLInDays(),
config.orphanFileExpirationDays());
scheduler.startScheduledMaintenance();
logger.info(
"Maintenance scheduler initialized with schedule: {}", config.maintenanceSchedule());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public record Config(
"Maintenance schedule in https://github.com/shyiko/skedule?tab=readme-ov-file#format format, e.g. \"every day 00:00\". Empty schedule disables automatic maintenance (default)")
String maintenanceSchedule,
@JsonPropertyDescription("TTL for snapshots in days.") int snapshotTTLInDays,
@JsonPropertyDescription("TTL for orphan files in days.") int orphanFileExpirationDays,
@JsonPropertyDescription(
"(experimental) Extra properties to include in loadTable REST response.")
Map<String, String> loadTableProperties,
Expand All @@ -77,6 +78,7 @@ public Config(
AnonymousAccess anonymousAccess,
String maintenanceSchedule,
int snapshotTTLInDays,
int orphanFileExpirationDays,
Map<String, String> loadTableProperties,
@JsonProperty("iceberg") Map<String, String> icebergProperties) {
this.addr = Strings.orDefault(addr, DEFAULT_ADDR);
Expand All @@ -91,6 +93,7 @@ public Config(
Objects.requireNonNullElse(anonymousAccess, new AnonymousAccess(false, null));
this.maintenanceSchedule = maintenanceSchedule;
this.snapshotTTLInDays = snapshotTTLInDays;
this.orphanFileExpirationDays = orphanFileExpirationDays;
this.loadTableProperties = Objects.requireNonNullElse(loadTableProperties, Map.of());
this.icebergProperties = Objects.requireNonNullElse(icebergProperties, Map.of());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,13 +37,16 @@ public class MaintenanceScheduler {

private ScheduledFuture<?> currentTask;
private final Integer snapshotExpirationDays;
private final Integer orphanFileExpirationDays;

public MaintenanceScheduler(Catalog catalog, String schedule, int snapshotExpirationDays) {
public MaintenanceScheduler(
Catalog catalog, String schedule, int snapshotExpirationDays, int orphanFileExpirationDays) {
this.catalog = catalog;
this.executor = new ScheduledThreadPoolExecutor(1);
((ScheduledThreadPoolExecutor) executor).setRemoveOnCancelPolicy(true);
this.schedule = Schedule.parse(schedule);
this.snapshotExpirationDays = snapshotExpirationDays;
this.orphanFileExpirationDays = orphanFileExpirationDays;
}

public void startScheduledMaintenance() {
Expand Down Expand Up @@ -109,8 +113,15 @@ public void performMaintenance() {
for (TableIdentifier tableIdent : tables) {
long olderThanMillis =
System.currentTimeMillis() - TimeUnit.DAYS.toMillis(snapshotExpirationDays);
Table table = catalog.loadTable(tableIdent);

// This throws a Location does not exist error.
Table table = null;
try {
table = catalog.loadTable(tableIdent);
} catch (NotFoundException ne) {
logger.warn("Table {} location not found, skipping maintenance", tableIdent, ne);
continue;
}
// Check if table has any snapshots before performing maintenance
if (table.currentSnapshot() == null) {
logger.warn("Table {} has no snapshots, skipping maintenance", tableIdent);
Expand All @@ -119,6 +130,30 @@ public void performMaintenance() {

table.rewriteManifests().rewriteIf(manifest -> true).commit();
table.expireSnapshots().expireOlderThan(olderThanMillis).commit();

if (orphanFileExpirationDays == 0) {
logger.info(
"Skipping orphan file removal for table {} since orphanFileExpirationDays config was not set",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this can be avoided for every table if we have a separate iteration, not sure if this is the best option.

tableIdent);
continue;
}
long orphanCutOffMillis =
System.currentTimeMillis() - TimeUnit.DAYS.toMillis(orphanFileExpirationDays);
// Remove orphans only for S3-based tables
String tableLocation = table.location();
if (tableLocation != null && tableLocation.startsWith("s3://")) {
OrphanFileScanner orphanFileScanner = new OrphanFileScanner(table);
try {
orphanFileScanner.removeOrphanedFiles(orphanCutOffMillis, false);
} catch (Exception e) {
logger.warn("Failed to remove orphan files for table {}", tableIdent, e);
}
} else {
logger.debug(
"Skipping orphan file removal for non-S3 table: {} (location: {})",
tableIdent,
tableLocation);
}
}
}
logger.info("Maintenance operations completed for catalog: {}", catalog.name());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package com.altinity.ice.rest.catalog.internal.maintenance;

import com.altinity.ice.internal.iceberg.io.SchemeFileIO;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OrphanFileScanner {
private static final Logger logger = LoggerFactory.getLogger(OrphanFileScanner.class);
private final Table table;

public OrphanFileScanner(Table table) {
this.table = table;
}

private Set<String> getAllKnownFiles() throws IOException {
Set<String> knownFiles = new HashSet<>();

for (Snapshot snapshot : table.snapshots()) {
if (snapshot.manifestListLocation() != null) {
knownFiles.add(snapshot.manifestListLocation());
}

FileIO io = table.io();

TableOperations ops = ((BaseTable) table).operations();
TableMetadata meta = ops.current();

String currentMetadataFile = meta.metadataFileLocation();
// Current metadata json file
knownFiles.add(currentMetadataFile);

// All the previous metadata JSON(is there a chance there might be
// json files that are not physically present?.
for (TableMetadata.MetadataLogEntry previousFile : meta.previousFiles()) {
knownFiles.add(previousFile.file());
}

for (ManifestFile manifest : snapshot.dataManifests(io)) {
knownFiles.add(manifest.path());

// Add data files inside each manifest
try (CloseableIterable<DataFile> files = ManifestFiles.read(manifest, table.io())) {
for (DataFile dataFile : files) {
knownFiles.add(dataFile.path().toString());
}
} catch (Exception e) {
throw e;
}
}
}

return knownFiles;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The result looks incomplete. Inserting a single file into empty table yields

./metadata/00000-f62c927f-3965-42e9-8f3a-57809ca49f55.metadata.json
./metadata/4b00c8c2-6d25-4ba5-bf27-989fbdb9daa1-m0.avro
./metadata/snap-7186194791324723249-1-4b00c8c2-6d25-4ba5-bf27-989fbdb9daa1.avro
./metadata/00001-07fe7fa7-8fe1-4666-bd7b-7c88b9c861fe.metadata.json
./data/pickup_datetime_month=2009-01/1750715797525-2d861d7a4e5196bf04e5c3be506b0152f4170cf4e941424c8245b46903c3e252-part.parquet

i.e. we're going to nuke a bunch of critical files and break the catalog...

}

public Set<String> findOrphanedFiles(String location, long olderThanMillis) throws IOException {
Set<String> knownFiles = getAllKnownFiles();

FileIO tableIO = table.io();

SchemeFileIO schemeFileIO;
if (tableIO instanceof SchemeFileIO) {
schemeFileIO = (SchemeFileIO) tableIO;
} else {
throw new UnsupportedOperationException("SchemeFileIO is required for S3 locations");
}

Set<String> allFiles = new HashSet<>();

Iterable<FileInfo> fileInfos = schemeFileIO.listPrefix(location);
for (FileInfo fileInfo : fileInfos) {
if (fileInfo.createdAtMillis() < olderThanMillis) {
allFiles.add(fileInfo.location());
}
}

allFiles.removeAll(knownFiles);

return allFiles;
}

public void removeOrphanedFiles(long olderThanMillis, boolean dryRun) throws IOException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

based on current impl this method should be executed only when warehouse is s3 (not s3 tables, etc)

String location = table.location();
logger.info("Looking for Orphaned files in location {}", location);
Set<String> orphanedFiles = findOrphanedFiles(location, olderThanMillis);

logger.info("Found {} orphaned files at {}!", orphanedFiles.size(), location);
// log all the orphaned files
orphanedFiles.forEach(f -> logger.info("Orphaned file: {}", f));

if (orphanedFiles.isEmpty()) {
logger.info("No orphaned files found at {}!", location);
return;
}

if (dryRun) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dryRun should probably a config variable, so they can try before toggling it?

logger.info(
"(Dry Run) Would delete {} orphaned files at {}!", orphanedFiles.size(), location);
orphanedFiles.forEach(f -> logger.info("Orphaned file: {}", f));
} else {

int numThreads = Math.min(8, orphanedFiles.size());

ExecutorService executor = Executors.newFixedThreadPool(numThreads);
List<Future<String>> futures =
orphanedFiles.stream()
.map(
file ->
executor.submit(
() -> {
try {
table.io().deleteFile(file);
return file;
} catch (Exception e) {
logger.warn("Failed to delete file {}", file, e);
return null;
}
}))
.collect(Collectors.toList());

executor.shutdown();

List<String> deletedFiles = new ArrayList<>();
for (Future<String> future : futures) {
try {
String result = future.get();
if (result != null) {
deletedFiles.add(result);
}
} catch (Exception e) {
logger.error("Error during file deletion", e);
return;
}
}

logger.info("Deleted {} orphaned files at {}!", deletedFiles.size(), location);
if (!deletedFiles.isEmpty()) {
deletedFiles.forEach(f -> logger.info("Deleted: {}", f));
}
}
}
}