Skip to content

Commit 6849776

Browse files
authored
[GOBBLIN-2225] Use random id for Workunit if recovery helper is not initialised (#4134)
* Add unique id to CopyEntity instead of computing it from serialised json string * Add unique id to CopyEntity instead of computing it from serialised json string
1 parent 88555c2 commit 6849776

File tree

2 files changed

+18
-4
lines changed

2 files changed

+18
-4
lines changed

gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyEntity.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,9 @@ public static List<CopyEntity> deserializeList(String serialized) {
118118
*/
119119
public static String getSerializedWithNewPackage(String serialized) {
120120
serialized = serialized.replace("\"gobblin.data.management.", "\"org.apache.gobblin.data.management.");
121-
log.debug("Serialized updated copy entity: " + serialized);
121+
if (log.isDebugEnabled()) {
122+
log.debug("Serialized updated copy entity: " + serialized);
123+
}
122124
return serialized;
123125
}
124126

gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919

2020
import java.io.IOException;
2121
import java.net.URI;
22+
import java.nio.charset.StandardCharsets;
2223
import java.util.Collection;
2324
import java.util.Iterator;
2425
import java.util.List;
2526
import java.util.Map;
27+
import java.util.UUID;
2628
import java.util.concurrent.Callable;
2729
import java.util.concurrent.ExecutionException;
2830
import java.util.concurrent.Future;
@@ -54,6 +56,7 @@
5456
import org.apache.gobblin.data.management.copy.extractor.FileAwareInputStreamExtractor;
5557
import org.apache.gobblin.data.management.copy.prioritization.FileSetComparator;
5658
import org.apache.gobblin.data.management.copy.publisher.CopyEventSubmitterHelper;
59+
import org.apache.gobblin.data.management.copy.recovery.RecoveryHelper;
5760
import org.apache.gobblin.data.management.copy.replication.ConfigBasedDataset;
5861
import org.apache.gobblin.data.management.copy.splitter.DistcpFileSplitter;
5962
import org.apache.gobblin.data.management.copy.watermark.CopyableFileWatermarkGenerator;
@@ -372,6 +375,10 @@ public Void call() {
372375

373376
Extract extract = new Extract(Extract.TableType.SNAPSHOT_ONLY, CopyConfiguration.COPY_PREFIX, extractId);
374377
List<WorkUnit> workUnitsForPartition = Lists.newArrayList();
378+
// use random guid for workunits if recovery is disabled. Getting deterministic guid is expensive as it
379+
// serialises entire workunit to generate guid
380+
boolean useRandomGuidForWorkUnit = !isRecoveryEnabled(state);
381+
log.info("Using " + (useRandomGuidForWorkUnit ? "random" : "deterministic") + " guids for workunits");
375382

376383
long fileSize;
377384
for (CopyEntity copyEntity : fileSet.getFiles()) {
@@ -400,7 +407,7 @@ public Void call() {
400407
workUnit.setProp(SlaEventKeys.PARTITION_KEY, copyEntity.getFileSet());
401408
setWorkUnitWeight(workUnit, copyEntity, minWorkUnitWeight);
402409
setWorkUnitWatermark(workUnit, watermarkGenerator, copyEntity);
403-
computeAndSetWorkUnitGuid(workUnit, copyEntity);
410+
computeAndSetWorkUnitGuid(workUnit, copyEntity, useRandomGuidForWorkUnit);
404411
addLineageInfo(copyEntity, workUnit);
405412
if (copyEntity instanceof CopyableFile) {
406413
CopyableFile castedCopyEntity = (CopyableFile) copyEntity;
@@ -427,6 +434,10 @@ public Void call() {
427434
}
428435
}
429436

437+
private static boolean isRecoveryEnabled(State state) {
438+
return state.contains(RecoveryHelper.PERSIST_DIR_KEY);
439+
}
440+
430441
private void setWorkUnitWatermark(WorkUnit workUnit, Optional<CopyableFileWatermarkGenerator> watermarkGenerator,
431442
CopyEntity copyEntity)
432443
throws IOException {
@@ -514,11 +525,12 @@ private static void setWorkUnitWeight(WorkUnit workUnit, CopyEntity copyEntity,
514525
workUnit.setProp(WORK_UNIT_WEIGHT, Long.toString(weight));
515526
}
516527

517-
private static void computeAndSetWorkUnitGuid(WorkUnit workUnit, CopyEntity copyEntity)
528+
private static void computeAndSetWorkUnitGuid(WorkUnit workUnit, CopyEntity copyEntity, boolean useRandomGuidForWorkUnit)
518529
throws IOException {
519530
Guid guid = Guid.fromStrings(workUnit.contains(ConfigurationKeys.CONVERTER_CLASSES_KEY) ? workUnit
520531
.getProp(ConfigurationKeys.CONVERTER_CLASSES_KEY) : "");
521-
setWorkUnitGuid(workUnit, guid.append(copyEntity.guid()));
532+
Guid copyEntityGuid = useRandomGuidForWorkUnit ? new Guid(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8)) : copyEntity.guid();
533+
setWorkUnitGuid(workUnit, guid.append(copyEntityGuid));
522534
}
523535

524536
/**

0 commit comments

Comments
 (0)