19
19
20
20
import java .io .IOException ;
21
21
import java .net .URI ;
22
+ import java .nio .charset .StandardCharsets ;
22
23
import java .util .Collection ;
23
24
import java .util .Iterator ;
24
25
import java .util .List ;
25
26
import java .util .Map ;
27
+ import java .util .UUID ;
26
28
import java .util .concurrent .Callable ;
27
29
import java .util .concurrent .ExecutionException ;
28
30
import java .util .concurrent .Future ;
54
56
import org .apache .gobblin .data .management .copy .extractor .FileAwareInputStreamExtractor ;
55
57
import org .apache .gobblin .data .management .copy .prioritization .FileSetComparator ;
56
58
import org .apache .gobblin .data .management .copy .publisher .CopyEventSubmitterHelper ;
59
+ import org .apache .gobblin .data .management .copy .recovery .RecoveryHelper ;
57
60
import org .apache .gobblin .data .management .copy .replication .ConfigBasedDataset ;
58
61
import org .apache .gobblin .data .management .copy .splitter .DistcpFileSplitter ;
59
62
import org .apache .gobblin .data .management .copy .watermark .CopyableFileWatermarkGenerator ;
@@ -372,6 +375,10 @@ public Void call() {
372
375
373
376
Extract extract = new Extract (Extract .TableType .SNAPSHOT_ONLY , CopyConfiguration .COPY_PREFIX , extractId );
374
377
List <WorkUnit > workUnitsForPartition = Lists .newArrayList ();
378
+ // use random guid for workunits if recovery is disabled. Getting deterministic guid is expensive as it
379
+ // serialises entire workunit to generate guid
380
+ boolean useRandomGuidForWorkUnit = !isRecoveryEnabled (state );
381
+ log .info ("Using " + (useRandomGuidForWorkUnit ? "random" : "deterministic" ) + " guids for workunits" );
375
382
376
383
long fileSize ;
377
384
for (CopyEntity copyEntity : fileSet .getFiles ()) {
@@ -400,7 +407,7 @@ public Void call() {
400
407
workUnit .setProp (SlaEventKeys .PARTITION_KEY , copyEntity .getFileSet ());
401
408
setWorkUnitWeight (workUnit , copyEntity , minWorkUnitWeight );
402
409
setWorkUnitWatermark (workUnit , watermarkGenerator , copyEntity );
403
- computeAndSetWorkUnitGuid (workUnit );
410
+ computeAndSetWorkUnitGuid (workUnit , copyEntity , useRandomGuidForWorkUnit );
404
411
addLineageInfo (copyEntity , workUnit );
405
412
if (copyEntity instanceof CopyableFile ) {
406
413
CopyableFile castedCopyEntity = (CopyableFile ) copyEntity ;
@@ -427,6 +434,10 @@ public Void call() {
427
434
}
428
435
}
429
436
437
+ private static boolean isRecoveryEnabled (State state ) {
438
+ return state .contains (RecoveryHelper .PERSIST_DIR_KEY );
439
+ }
440
+
430
441
private void setWorkUnitWatermark (WorkUnit workUnit , Optional <CopyableFileWatermarkGenerator > watermarkGenerator ,
431
442
CopyEntity copyEntity )
432
443
throws IOException {
@@ -514,11 +525,12 @@ private static void setWorkUnitWeight(WorkUnit workUnit, CopyEntity copyEntity,
514
525
workUnit .setProp (WORK_UNIT_WEIGHT , Long .toString (weight ));
515
526
}
516
527
517
- private static void computeAndSetWorkUnitGuid (WorkUnit workUnit )
528
+ private static void computeAndSetWorkUnitGuid (WorkUnit workUnit , CopyEntity copyEntity , boolean useRandomGuidForWorkUnit )
518
529
throws IOException {
519
530
Guid guid = Guid .fromStrings (workUnit .contains (ConfigurationKeys .CONVERTER_CLASSES_KEY ) ? workUnit
520
531
.getProp (ConfigurationKeys .CONVERTER_CLASSES_KEY ) : "" );
521
- setWorkUnitGuid (workUnit , guid .append (deserializeCopyEntity (workUnit )));
532
+ Guid copyEntityGuid = useRandomGuidForWorkUnit ? new Guid (UUID .randomUUID ().toString ().getBytes (StandardCharsets .UTF_8 )) : copyEntity .guid ();
533
+ setWorkUnitGuid (workUnit , guid .append (copyEntityGuid ));
522
534
}
523
535
524
536
/**
0 commit comments