Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 7 additions & 13 deletions core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.puffin.Blob;
import org.apache.iceberg.puffin.BlobMetadata;
import org.apache.iceberg.puffin.Puffin;
import org.apache.iceberg.puffin.PuffinCompressionCodec;
import org.apache.iceberg.puffin.PuffinReader;
Expand Down Expand Up @@ -676,24 +677,22 @@ private static void rewriteDVFile(
String sourcePrefix,
String targetPrefix)
throws IOException {
List<Blob> rewrittenBlobs = Lists.newArrayList();
try (PuffinReader reader = Puffin.read(io.newInputFile(deleteFile.location())).build()) {
// Read all blobs and rewrite them with updated referenced data file paths
for (Pair<org.apache.iceberg.puffin.BlobMetadata, ByteBuffer> blobPair :
try (PuffinReader reader = Puffin.read(io.newInputFile(deleteFile.location())).build();
PuffinWriter writer =
Puffin.write(outputFile).createdBy(IcebergBuild.fullVersion()).build()) {
for (Pair<BlobMetadata, ByteBuffer> blobPair :
reader.readAll(reader.fileMetadata().blobs())) {
org.apache.iceberg.puffin.BlobMetadata blobMetadata = blobPair.first();
BlobMetadata blobMetadata = blobPair.first();
ByteBuffer blobData = blobPair.second();

// Get the original properties and update the referenced data file path
Map<String, String> properties = Maps.newHashMap(blobMetadata.properties());
String referencedDataFile = properties.get("referenced-data-file");
if (referencedDataFile != null && referencedDataFile.startsWith(sourcePrefix)) {
String newReferencedDataFile = newPath(referencedDataFile, sourcePrefix, targetPrefix);
properties.put("referenced-data-file", newReferencedDataFile);
}

// Create a new blob with updated properties
rewrittenBlobs.add(
writer.write(
new Blob(
blobMetadata.type(),
blobMetadata.inputFields(),
Expand All @@ -704,11 +703,6 @@ private static void rewriteDVFile(
properties));
}
}

try (PuffinWriter writer =
Puffin.write(outputFile).createdBy(IcebergBuild.fullVersion()).build()) {
rewrittenBlobs.forEach(writer::write);
}
}

private static PositionDelete newPositionDeleteRecord(
Expand Down
150 changes: 150 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestRewriteTablePathUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,31 @@
import static org.assertj.core.api.Assumptions.assumeThat;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Set;
import org.apache.iceberg.deletes.Deletes;
import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.puffin.Blob;
import org.apache.iceberg.puffin.BlobMetadata;
import org.apache.iceberg.puffin.Puffin;
import org.apache.iceberg.puffin.PuffinReader;
import org.apache.iceberg.puffin.PuffinWriter;
import org.apache.iceberg.puffin.StandardBlobTypes;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.Pair;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(ParameterizedTestExtension.class)
public class TestRewriteTablePathUtil extends TestBase {
private static final String REFERENCED_DATA_FILE = "referenced-data-file";
private static final String CARDINALITY = "cardinality";

@Test
public void testStagingPathPreservesDirectoryStructure() {
Expand Down Expand Up @@ -281,4 +299,136 @@ public void testRewritingMultiplePositionDeleteEntriesWithinManifestFile() throw

assertThat(deleteFileRewriteResult.toRewrite()).hasSize(2);
}

@TestTemplate
void rewriteDVFileRewritesReferencedDataFileInBlobMetadata() throws IOException {
assumeThat(formatVersion).as("DVs require format version 3+").isGreaterThanOrEqualTo(3);

String sourcePrefix = temp.resolve("source").toString();
String targetPrefix = temp.resolve("target").toString();
String sourceDataFile = sourcePrefix + "/data/file-a.parquet";
String externalDataFile = temp.resolve("external/data/file-b.parquet").toString();

PositionDeleteIndex sourceDeletes = positionDeleteIndex(1L, 4L);
PositionDeleteIndex externalDeletes = positionDeleteIndex(2L, 5L, 8L);
byte[] sourcePayload = serializedDV(sourceDeletes);
byte[] externalPayload = serializedDV(externalDeletes);

OutputFile sourceDVFile =
Files.localOutput(
temp.resolve("source/metadata/dv-" + System.nanoTime() + ".puffin").toString());
try (PuffinWriter writer = Puffin.write(sourceDVFile).createdBy("test").build()) {
writer.write(newDVBlob(sourceDeletes, sourcePayload, sourceDataFile));
writer.write(newDVBlob(externalDeletes, externalPayload, externalDataFile));
}

List<BlobMetadata> sourceBlobMetadata;
try (PuffinReader reader = Puffin.read(sourceDVFile.toInputFile()).build()) {
sourceBlobMetadata = reader.fileMetadata().blobs();
}
assertThat(sourceBlobMetadata).hasSize(2);

DeleteFile dvDeleteFile =
FileMetadata.deleteFileBuilder(table.spec())
.ofPositionDeletes()
.withFormat(FileFormat.PUFFIN)
.withPath(sourceDVFile.location())
.withFileSizeInBytes(sourceDVFile.toInputFile().getLength())
.withPartition(FILE_A.partition())
.withRecordCount(sourceDeletes.cardinality())
.withReferencedDataFile(sourceDataFile)
.withContentOffset(sourceBlobMetadata.get(0).offset())
.withContentSizeInBytes(sourceBlobMetadata.get(0).length())
.build();

OutputFile rewrittenDVFile =
Files.localOutput(
temp.resolve("target/metadata/dv-rewritten-" + System.nanoTime() + ".puffin")
.toString());
RewriteTablePathUtil.rewritePositionDeleteFile(
dvDeleteFile, rewrittenDVFile, table.io(), table.spec(), sourcePrefix, targetPrefix, null);

try (PuffinReader reader = Puffin.read(rewrittenDVFile.toInputFile()).build()) {
List<BlobMetadata> rewrittenBlobMetadata = reader.fileMetadata().blobs();
assertThat(rewrittenBlobMetadata).hasSize(2);

BlobMetadata rewrittenSourceBlob = rewrittenBlobMetadata.get(0);
BlobMetadata rewrittenExternalBlob = rewrittenBlobMetadata.get(1);
assertDVBlobMetadata(
rewrittenSourceBlob, targetPrefix + "/data/file-a.parquet", sourceDeletes.cardinality());
assertDVBlobMetadata(rewrittenExternalBlob, externalDataFile, externalDeletes.cardinality());

assertThat(rewrittenSourceBlob.offset()).isEqualTo(dvDeleteFile.contentOffset());
assertThat(rewrittenSourceBlob.length()).isEqualTo(dvDeleteFile.contentSizeInBytes());
assertThat(rewrittenExternalBlob.offset())
.isEqualTo(rewrittenSourceBlob.offset() + rewrittenSourceBlob.length());

List<Pair<BlobMetadata, ByteBuffer>> blobs =
ImmutableList.copyOf(reader.readAll(rewrittenBlobMetadata));
assertThat(ByteBuffers.toByteArray(blobs.get(0).second())).isEqualTo(sourcePayload);
assertThat(ByteBuffers.toByteArray(blobs.get(1).second())).isEqualTo(externalPayload);
}

DeleteFile rewrittenDVDeleteFile =
FileMetadata.deleteFileBuilder(table.spec())
.copy(dvDeleteFile)
.withPath(rewrittenDVFile.location())
.withFileSizeInBytes(rewrittenDVFile.toInputFile().getLength())
.withReferencedDataFile(targetPrefix + "/data/file-a.parquet")
.build();
assertDeletedPositions(DVUtil.readDV(rewrittenDVDeleteFile, table.io()), 1L, 4L);
}

private static Blob newDVBlob(
PositionDeleteIndex deletes, byte[] payload, String referencedDataFile) {
return new Blob(
StandardBlobTypes.DV_V1,
ImmutableList.of(MetadataColumns.ROW_POSITION.fieldId()),
-1L,
-1L,
ByteBuffer.wrap(payload),
null,
ImmutableMap.of(
REFERENCED_DATA_FILE,
referencedDataFile,
CARDINALITY,
String.valueOf(deletes.cardinality())));
}

private static PositionDeleteIndex positionDeleteIndex(long... positions) {
ImmutableList.Builder<Long> builder = ImmutableList.builder();
for (long position : positions) {
builder.add(position);
}

return Deletes.toPositionIndex(CloseableIterable.withNoopClose(builder.build()));
}

private static byte[] serializedDV(PositionDeleteIndex deletes) {
return ByteBuffers.toByteArray(deletes.serialize());
}

private static void assertDVBlobMetadata(
BlobMetadata blobMetadata, String referencedDataFile, long cardinality) {
assertThat(blobMetadata.type()).isEqualTo(StandardBlobTypes.DV_V1);
assertThat(blobMetadata.inputFields()).containsExactly(MetadataColumns.ROW_POSITION.fieldId());
assertThat(blobMetadata.snapshotId()).isEqualTo(-1L);
assertThat(blobMetadata.sequenceNumber()).isEqualTo(-1L);
assertThat(blobMetadata.compressionCodec()).isNull();
assertThat(blobMetadata.offset()).isPositive();
assertThat(blobMetadata.length()).isPositive();
assertThat(blobMetadata.properties())
.containsExactlyInAnyOrderEntriesOf(
ImmutableMap.of(
REFERENCED_DATA_FILE,
referencedDataFile,
CARDINALITY,
String.valueOf(cardinality)));
}

private static void assertDeletedPositions(PositionDeleteIndex deletes, long... positions) {
for (long position : positions) {
assertThat(deletes.isDeleted(position)).isTrue();
}
}
}