-
Notifications
You must be signed in to change notification settings - Fork 751
[GOBBLIN-2226] Construct iceberg data files during commit step #4140
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
public String getBase64EncodedDataFile() { | ||
return this.base64EncodedDataFile; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be removed
private Map<Path, FileStatus> calcSrcFileStatusByDestFilePath( | ||
Map<Path, DataFile> destDataFileBySrcPath, | ||
Map<Path, Path> destPathToSrcPath) throws IOException { | ||
Function<Path, FileStatus> getFileStatus = CheckedExceptionFunction.wrapToTunneled(this.sourceFs::getFileStatus); | ||
Map<Path, FileStatus> srcFileStatusByDestFilePath = new ConcurrentHashMap<>(); | ||
final Map<Path, FileStatus> srcFileStatusByDestFilePath = new ConcurrentHashMap<>(); | ||
try { | ||
srcFileStatusByDestFilePath = destDataFileBySrcPath.entrySet() | ||
destDataFileBySrcPath.entrySet() | ||
.parallelStream() | ||
.collect(Collectors.toConcurrentMap(entry -> new Path(entry.getValue().path().toString()), | ||
entry -> getFileStatus.apply(entry.getKey()))); | ||
.forEach(entry -> { | ||
Path destPath = new Path(entry.getValue().path().toString()); | ||
destPathToSrcPath.put(destPath, entry.getKey()); | ||
srcFileStatusByDestFilePath.put(destPath, getFileStatus.apply(entry.getKey())); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit : Let's move this logic to caller itself and see if we can parallelize that for-loop to reduce runtime
List<DataFile> icebergDataFiles = new ArrayList<>(); | ||
for (WorkUnitState wus : statesHelper.getNonPostPublishStates()) { | ||
if (wus.getWorkingState() == WorkingState.SUCCESSFUL) { | ||
wus.setWorkingState(WorkUnitState.WorkingState.COMMITTED); | ||
} | ||
CopyEntity copyEntity = CopySource.deserializeCopyEntity(wus); | ||
if (copyEntity instanceof CopyableFile) { | ||
if (copyEntity instanceof CopyableFile || copyEntity instanceof IcebergPartitionCopyableFile) { | ||
if (copyEntity instanceof IcebergPartitionCopyableFile) { | ||
icebergDataFiles.add(((IcebergPartitionCopyableFile) copyEntity).getDataFile()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets add a comment to describe why this is done
|
||
public DataFile getDataFile() { | ||
return SerializationUtil.deserializeFromBase64(base64EncodedDataFile); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit : lets add java doc here too
734354f
to
71bb739
Compare
71bb739
to
e0ae58f
Compare
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
Instead of adding all the data files to Post publish step for iceberg partition copy, this PR adds Data file to each Iceberg partition copyable file during work unit generation & then adds all the data files to post publish step during commit step, hence avoiding serialisation & deserialisation of all the data files at once
Changes:
During WU generation, Copyable file is created and serialised. This PR adds an extension to Copyable file, IcebergPartitionCopyable file which contains the corresponding data file
During Commit step, in Iceberg post commit step, all the data files are collected from all the iceberg copyable file & the then gets added to IcebergOverwritePartitionsStep
Tests
Commits