Skip to content

Commit 5ad93db

Browse files
authored
Construct iceberg data files during commit step (#4140)
1 parent 3617c8f commit 5ad93db

File tree

6 files changed

+145
-109
lines changed

6 files changed

+145
-109
lines changed

gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import java.io.IOException;
2121
import java.time.Duration;
22-
import java.util.ArrayList;
2322
import java.util.List;
2423
import java.util.Optional;
2524
import java.util.Properties;
@@ -28,7 +27,6 @@
2827

2928
import org.apache.iceberg.DataFile;
3029
import org.apache.iceberg.catalog.TableIdentifier;
31-
import org.apache.iceberg.util.SerializationUtil;
3230

3331
import com.github.rholder.retry.Attempt;
3432
import com.github.rholder.retry.RetryException;
@@ -38,6 +36,7 @@
3836
import com.typesafe.config.Config;
3937
import com.typesafe.config.ConfigFactory;
4038

39+
import lombok.Setter;
4140
import lombok.extern.slf4j.Slf4j;
4241

4342
import org.apache.gobblin.commit.CommitStep;
@@ -59,8 +58,8 @@
5958
public class IcebergOverwritePartitionsStep implements CommitStep {
6059
private final String destTableIdStr;
6160
private final Properties properties;
62-
// Data files are kept as a list of base64 encoded strings for optimised de-serialization.
63-
private final List<String> base64EncodedDataFiles;
61+
// data files are populated once all the copy tasks are done. Each IcebergPartitionCopyableFile has a serialized data file
62+
@Setter private List<DataFile> dataFiles;
6463
private final String partitionColName;
6564
private final String partitionValue;
6665
public static final String OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX +
@@ -74,14 +73,12 @@ public class IcebergOverwritePartitionsStep implements CommitStep {
7473
* Constructs an {@code IcebergReplacePartitionsStep} with the specified parameters.
7574
*
7675
* @param destTableIdStr the identifier of the destination table as a string
77-
* @param base64EncodedDataFiles [from List<DataFiles>] the serialized data files to be used for replacing partitions
7876
* @param properties the properties containing configuration
7977
*/
80-
public IcebergOverwritePartitionsStep(String destTableIdStr, String partitionColName, String partitionValue, List<String> base64EncodedDataFiles, Properties properties) {
78+
public IcebergOverwritePartitionsStep(String destTableIdStr, String partitionColName, String partitionValue, Properties properties) {
8179
this.destTableIdStr = destTableIdStr;
8280
this.partitionColName = partitionColName;
8381
this.partitionValue = partitionValue;
84-
this.base64EncodedDataFiles = base64EncodedDataFiles;
8582
this.properties = properties;
8683
}
8784

@@ -103,7 +100,6 @@ public void execute() throws IOException {
103100
// our copying. any new data written in the meanwhile to THE SAME partitions we are about to overwrite will be
104101
// clobbered and replaced by the copy entities from our execution.
105102
IcebergTable destTable = createDestinationCatalog().openTable(TableIdentifier.parse(this.destTableIdStr));
106-
List<DataFile> dataFiles = getDataFiles();
107103
try {
108104
log.info("~{}~ Starting partition overwrite - partition: {}; value: {}; numDataFiles: {}; path[0]: {}",
109105
this.destTableIdStr,
@@ -140,14 +136,6 @@ public void execute() throws IOException {
140136
}
141137
}
142138

143-
private List<DataFile> getDataFiles() {
144-
List<DataFile> dataFiles = new ArrayList<>(base64EncodedDataFiles.size());
145-
for (String base64EncodedDataFile : base64EncodedDataFiles) {
146-
dataFiles.add(SerializationUtil.deserializeFromBase64(base64EncodedDataFile));
147-
}
148-
return dataFiles;
149-
}
150-
151139
protected IcebergCatalog createDestinationCatalog() throws IOException {
152140
return IcebergDatasetFinder.createIcebergCatalog(this.properties, IcebergDatasetFinder.CatalogLocation.DESTINATION);
153141
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.gobblin.data.management.copy.iceberg;
19+
20+
import org.apache.iceberg.DataFile;
21+
import org.apache.iceberg.util.SerializationUtil;
22+
23+
import lombok.AccessLevel;
24+
import lombok.EqualsAndHashCode;
25+
import lombok.Getter;
26+
import lombok.NoArgsConstructor;
27+
import lombok.Setter;
28+
import lombok.extern.slf4j.Slf4j;
29+
30+
import org.apache.gobblin.data.management.copy.CopyableFile;
31+
32+
33+
/**
34+
* An extension of {@link CopyableFile} that includes a base64-encoded Iceberg {@link DataFile}.
35+
*/
36+
@Getter
37+
@Setter
38+
@NoArgsConstructor(access = AccessLevel.PROTECTED)
39+
@EqualsAndHashCode(callSuper = true)
40+
@Slf4j
41+
public class IcebergPartitionCopyableFile extends CopyableFile {
42+
43+
/**
44+
* Base64-encoded Iceberg {@link DataFile} associated with this copyable file.
45+
*/
46+
private String base64EncodedDataFile;
47+
48+
public IcebergPartitionCopyableFile(CopyableFile copyableFile, DataFile dataFile) {
49+
super(copyableFile.getOrigin(), copyableFile.getDestination(), copyableFile.getDestinationOwnerAndPermission(),
50+
copyableFile.getAncestorsOwnerAndPermission(), copyableFile.getChecksum(), copyableFile.getPreserve(),
51+
copyableFile.getFileSet(), copyableFile.getOriginTimestamp(), copyableFile.getUpstreamTimestamp(),
52+
copyableFile.getAdditionalMetadata(), copyableFile.datasetOutputPath, copyableFile.getDataFileVersionStrategy());
53+
this.base64EncodedDataFile = SerializationUtil.serializeToBase64(dataFile);
54+
}
55+
56+
public DataFile getDataFile() {
57+
return SerializationUtil.deserializeFromBase64(base64EncodedDataFile);
58+
}
59+
}

gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java

Lines changed: 51 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,13 @@
2020
import java.io.IOException;
2121
import java.util.ArrayList;
2222
import java.util.Collection;
23+
import java.util.Collections;
2324
import java.util.List;
24-
import java.util.Map;
2525
import java.util.Optional;
2626
import java.util.Properties;
2727
import java.util.UUID;
28-
import java.util.concurrent.ConcurrentHashMap;
2928
import java.util.function.Function;
3029
import java.util.function.Predicate;
31-
import java.util.stream.Collectors;
3230

3331
import org.apache.commons.collections.CollectionUtils;
3432
import org.apache.commons.lang3.StringUtils;
@@ -42,9 +40,7 @@
4240
import org.apache.iceberg.StructLike;
4341
import org.apache.iceberg.TableMetadata;
4442
import org.apache.iceberg.TableProperties;
45-
import org.apache.iceberg.util.SerializationUtil;
4643

47-
import com.google.common.collect.Lists;
4844
import com.google.common.collect.Maps;
4945
import com.google.common.collect.ImmutableList;
5046
import com.google.common.base.Preconditions;
@@ -101,48 +97,16 @@ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfigurati
10197
// TODO: Refactor the IcebergDataset::generateCopyEntities to avoid code duplication
10298
// Differences are getting data files, copying ancestor permission and adding post publish steps
10399
String fileSet = this.getFileSetId();
104-
List<CopyEntity> copyEntities = Lists.newArrayList();
105100
IcebergTable srcIcebergTable = getSrcIcebergTable();
106101
List<DataFile> srcDataFiles = srcIcebergTable.getPartitionSpecificDataFiles(this.partitionFilterPredicate);
107-
Map<Path, DataFile> destDataFileBySrcPath = calcDestDataFileBySrcPath(srcDataFiles);
108-
Configuration defaultHadoopConfiguration = new Configuration();
109-
110-
for (Map.Entry<Path, FileStatus> entry : calcSrcFileStatusByDestFilePath(destDataFileBySrcPath).entrySet()) {
111-
Path destPath = entry.getKey();
112-
FileStatus srcFileStatus = entry.getValue();
113-
// TODO: should be the same FS each time; try creating once, reusing thereafter, to not recreate wastefully
114-
FileSystem actualSourceFs = getSourceFileSystemFromFileStatus(srcFileStatus, defaultHadoopConfiguration);
115-
116-
CopyableFile fileEntity = CopyableFile.fromOriginAndDestination(
117-
actualSourceFs, srcFileStatus, targetFs.makeQualified(destPath), copyConfig)
118-
.fileSet(fileSet)
119-
.datasetOutputPath(targetFs.getUri().getPath())
120-
.build();
121-
122-
fileEntity.setSourceData(getSourceDataset(this.sourceFs));
123-
fileEntity.setDestinationData(getDestinationDataset(targetFs));
124-
copyEntities.add(fileEntity);
125-
}
126-
127-
// Adding this check to avoid adding post publish step when there are no files to copy.
128-
List<DataFile> destDataFiles = new ArrayList<>(destDataFileBySrcPath.values());
129-
if (CollectionUtils.isNotEmpty(destDataFiles)) {
130-
copyEntities.add(createOverwritePostPublishStep(destDataFiles));
131-
}
132-
133-
log.info("~{}~ generated {} copy entities", fileSet, copyEntities.size());
134-
return copyEntities;
135-
}
136102

137-
private Map<Path, DataFile> calcDestDataFileBySrcPath(List<DataFile> srcDataFiles)
138-
throws IcebergTable.TableNotFoundException {
139-
String fileSet = this.getFileSetId();
140-
Map<Path, DataFile> destDataFileBySrcPath = new ConcurrentHashMap<>(srcDataFiles.size());
141103
if (srcDataFiles.isEmpty()) {
142104
log.warn("~{}~ found no data files for partition col : {} with partition value : {} to copy", fileSet,
143105
this.partitionColumnName, this.partitionColValue);
144-
return destDataFileBySrcPath;
106+
return new ArrayList<>(0);
145107
}
108+
109+
// get source & destination write data locations to update data file paths
146110
TableMetadata srcTableMetadata = getSrcIcebergTable().accessTableMetadata();
147111
TableMetadata destTableMetadata = getDestIcebergTable().accessTableMetadata();
148112
PartitionSpec partitionSpec = destTableMetadata.spec();
@@ -160,17 +124,58 @@ private Map<Path, DataFile> calcDestDataFileBySrcPath(List<DataFile> srcDataFile
160124
destWriteDataLocation
161125
);
162126
}
163-
srcDataFiles.forEach(dataFile -> {
127+
128+
List<CopyEntity> copyEntities = getIcebergParitionCopyEntities(targetFs, srcDataFiles, srcWriteDataLocation, destWriteDataLocation, partitionSpec, copyConfig);
129+
// Adding this check to avoid adding post publish step when there are no files to copy.
130+
if (CollectionUtils.isNotEmpty(copyEntities)) {
131+
copyEntities.add(createOverwritePostPublishStep());
132+
}
133+
134+
log.info("~{}~ generated {} copy entities", fileSet, copyEntities.size());
135+
return copyEntities;
136+
}
137+
138+
private List<CopyEntity> getIcebergParitionCopyEntities(
139+
FileSystem targetFs,
140+
List<DataFile> srcDataFiles,
141+
String srcWriteDataLocation,
142+
String destWriteDataLocation,
143+
PartitionSpec partitionSpec,
144+
CopyConfiguration copyConfig) {
145+
String fileSet = this.getFileSetId();
146+
Configuration defaultHadoopConfiguration = new Configuration();
147+
List<CopyEntity> copyEntities = Collections.synchronizedList(new ArrayList<>(srcDataFiles.size()));
148+
Function<Path, FileStatus> getFileStatus = CheckedExceptionFunction.wrapToTunneled(this.sourceFs::getFileStatus);
149+
150+
srcDataFiles.parallelStream().forEach(dataFile -> {
151+
// create destination data file from source data file by replacing the source path with destination path
164152
String srcFilePath = dataFile.path().toString();
165153
Path updatedDestFilePath = relocateDestPath(srcFilePath, srcWriteDataLocation, destWriteDataLocation);
166154
log.debug("~{}~ Path changed from Src : {} to Dest : {}", fileSet, srcFilePath, updatedDestFilePath);
167-
destDataFileBySrcPath.put(new Path(srcFilePath), DataFiles.builder(partitionSpec)
155+
DataFile destDataFile = DataFiles.builder(partitionSpec)
168156
.copy(dataFile)
169157
.withPath(updatedDestFilePath.toString())
170-
.build());
158+
.build();
159+
160+
// get file status of source file
161+
FileStatus srcFileStatus = getFileStatus.apply(new Path(srcFilePath));
162+
try {
163+
// TODO: should be the same FS each time; try creating once, reusing thereafter, to not recreate wastefully
164+
FileSystem actualSourceFs = getSourceFileSystemFromFileStatus(srcFileStatus, defaultHadoopConfiguration);
165+
// create copyable file entity
166+
CopyableFile fileEntity = CopyableFile.fromOriginAndDestination(actualSourceFs, srcFileStatus,
167+
targetFs.makeQualified(updatedDestFilePath), copyConfig).fileSet(fileSet)
168+
.datasetOutputPath(targetFs.getUri().getPath()).build();
169+
fileEntity.setSourceData(getSourceDataset(this.sourceFs));
170+
fileEntity.setDestinationData(getDestinationDataset(targetFs));
171+
// add corresponding data file to each copyable iceberg partition file
172+
IcebergPartitionCopyableFile icebergPartitionCopyableFile = new IcebergPartitionCopyableFile(fileEntity, destDataFile);
173+
copyEntities.add(icebergPartitionCopyableFile);
174+
} catch (IOException e) {
175+
throw new RuntimeException(e);
176+
}
171177
});
172-
log.info("~{}~ created {} destination data files", fileSet, destDataFileBySrcPath.size());
173-
return destDataFileBySrcPath;
178+
return copyEntities;
174179
}
175180

176181
private Path relocateDestPath(String curPathStr, String prefixToBeReplaced, String prefixToReplaceWith) {
@@ -186,43 +191,17 @@ private Path addUUIDToPath(String filePathStr) {
186191
return new Path(fileDir, newFileName);
187192
}
188193

189-
private Map<Path, FileStatus> calcSrcFileStatusByDestFilePath(Map<Path, DataFile> destDataFileBySrcPath)
190-
throws IOException {
191-
Function<Path, FileStatus> getFileStatus = CheckedExceptionFunction.wrapToTunneled(this.sourceFs::getFileStatus);
192-
Map<Path, FileStatus> srcFileStatusByDestFilePath = new ConcurrentHashMap<>();
193-
try {
194-
srcFileStatusByDestFilePath = destDataFileBySrcPath.entrySet()
195-
.parallelStream()
196-
.collect(Collectors.toConcurrentMap(entry -> new Path(entry.getValue().path().toString()),
197-
entry -> getFileStatus.apply(entry.getKey())));
198-
} catch (CheckedExceptionFunction.WrappedIOException wrapper) {
199-
wrapper.rethrowWrapped();
200-
}
201-
return srcFileStatusByDestFilePath;
202-
}
203-
204-
private PostPublishStep createOverwritePostPublishStep(List<DataFile> destDataFiles) {
205-
List<String> serializedDataFiles = getBase64EncodedDataFiles(destDataFiles);
206-
194+
private PostPublishStep createOverwritePostPublishStep() {
207195
IcebergOverwritePartitionsStep icebergOverwritePartitionStep = new IcebergOverwritePartitionsStep(
208196
this.getDestIcebergTable().getTableId().toString(),
209197
this.partitionColumnName,
210198
this.partitionColValue,
211-
serializedDataFiles,
212199
this.properties
213200
);
214201

215202
return new PostPublishStep(this.getFileSetId(), Maps.newHashMap(), icebergOverwritePartitionStep, 0);
216203
}
217204

218-
private List<String> getBase64EncodedDataFiles(List<DataFile> destDataFiles) {
219-
List<String> base64EncodedDataFiles = new ArrayList<>(destDataFiles.size());
220-
for (DataFile dataFile : destDataFiles) {
221-
base64EncodedDataFiles.add(SerializationUtil.serializeToBase64(dataFile));
222-
}
223-
return base64EncodedDataFiles;
224-
}
225-
226205
private Predicate<StructLike> createPartitionFilterPredicate() throws IOException {
227206
//TODO: Refactor it later using factory or other way to support different types of filter predicate
228207
// Also take into consideration creation of Expression Filter to be used in overwrite api

0 commit comments

Comments
 (0)