Skip to content

Commit 86ba02e

Browse files
SK-2258 synchronized errors
1 parent 474e83b commit 86ba02e

File tree

1 file changed

+14
-8
lines changed

1 file changed

+14
-8
lines changed

v3/src/main/java/com/skyflow/vault/controller/VaultController.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -77,13 +77,14 @@ public CompletableFuture<com.skyflow.vault.data.InsertResponse> bulkInsertAsync(
7777
setBearerToken();
7878
configureInsertConcurrencyAndBatchSize(insertRequest.getValues().size());
7979
com.skyflow.generated.rest.resources.recordservice.requests.InsertRequest request = super.getBulkInsertRequestBody(insertRequest, super.getVaultConfig());
80+
ExecutorService executor = Executors.newFixedThreadPool(insertConcurrencyLimit);
8081

81-
List<CompletableFuture<com.skyflow.vault.data.InsertResponse>> futures = this.insertBatchFutures(request);
82+
List<ErrorRecord> errorRecords = new ArrayList<>();
83+
List<CompletableFuture<com.skyflow.vault.data.InsertResponse>> futures = this.insertBatchFutures(request, errorRecords, executor);
8284

8385
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
8486
.thenApply(v -> {
8587
List<Success> successRecords = new ArrayList<>();
86-
List<ErrorRecord> errorRecords = new ArrayList<>();
8788

8889
for (CompletableFuture<com.skyflow.vault.data.InsertResponse> future : futures) {
8990
com.skyflow.vault.data.InsertResponse futureResponse = future.join();
@@ -96,6 +97,7 @@ public CompletableFuture<com.skyflow.vault.data.InsertResponse> bulkInsertAsync(
9697
}
9798
}
9899
}
100+
executor.shutdown(); // Shutdown the executor after all tasks are completed
99101

100102
return new com.skyflow.vault.data.InsertResponse(successRecords, errorRecords, insertRequest.getValues());
101103
});
@@ -113,9 +115,9 @@ private com.skyflow.vault.data.InsertResponse processSync(
113115
LogUtil.printInfoLog(InfoLogs.PROCESSING_BATCHES.getLog());
114116
List<ErrorRecord> errorRecords = new ArrayList<>();
115117
List<Success> successRecords = new ArrayList<>();
118+
ExecutorService executor = Executors.newFixedThreadPool(insertConcurrencyLimit);
116119

117-
List<CompletableFuture<com.skyflow.vault.data.InsertResponse>> futures = this.insertBatchFutures(insertRequest);
118-
120+
List<CompletableFuture<com.skyflow.vault.data.InsertResponse>> futures = this.insertBatchFutures(insertRequest, errorRecords, executor);
119121
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
120122
allFutures.join();
121123

@@ -129,6 +131,7 @@ private com.skyflow.vault.data.InsertResponse processSync(
129131
errorRecords.addAll(futureResponse.getErrors());
130132
}
131133
}
134+
executor.shutdown(); // Shutdown the executor after all tasks are completed
132135
}
133136
com.skyflow.vault.data.InsertResponse response = new com.skyflow.vault.data.InsertResponse(successRecords, errorRecords, originalPayload);
134137
LogUtil.printInfoLog(InfoLogs.INSERT_REQUEST_RESOLVED.getLog());
@@ -137,11 +140,9 @@ private com.skyflow.vault.data.InsertResponse processSync(
137140

138141

139142
private List<CompletableFuture<com.skyflow.vault.data.InsertResponse>> insertBatchFutures(
140-
com.skyflow.generated.rest.resources.recordservice.requests.InsertRequest insertRequest
141-
) {
143+
com.skyflow.generated.rest.resources.recordservice.requests.InsertRequest insertRequest, List<ErrorRecord> errorRecords, ExecutorService executor) {
142144
List<InsertRecordData> records = insertRequest.getRecords().get();
143145

144-
ExecutorService executor = Executors.newFixedThreadPool(insertConcurrencyLimit);
145146
List<List<InsertRecordData>> batches = Utils.createBatches(records, insertBatchSize);
146147
List<CompletableFuture<com.skyflow.vault.data.InsertResponse>> futures = new ArrayList<>();
147148

@@ -152,7 +153,12 @@ private List<CompletableFuture<com.skyflow.vault.data.InsertResponse>> insertBat
152153
CompletableFuture<com.skyflow.vault.data.InsertResponse> future = CompletableFuture
153154
.supplyAsync(() -> insertBatch(batch, insertRequest.getTableName().get()), executor)
154155
.thenApply(response -> formatResponse(response, batchNumber, insertBatchSize))
155-
.exceptionally(ex -> new com.skyflow.vault.data.InsertResponse(null, handleBatchException(ex, batch, batchNumber, batches)));
156+
.exceptionally(ex -> {
157+
synchronized (errorRecords){
158+
errorRecords.addAll(handleBatchException(ex, batch, batchNumber, batches));
159+
}
160+
return null;
161+
});
156162
futures.add(future);
157163
}
158164
} finally {

0 commit comments

Comments
 (0)