Skip to content

Commit ae75c8d

Browse files
SK-2302 add changes for bulk async changes
1 parent 10d960f commit ae75c8d

File tree

12 files changed

+425
-46
lines changed

12 files changed

+425
-46
lines changed

common/src/main/java/com/skyflow/errors/ErrorMessage.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,8 @@ public enum ErrorMessage {
1111
ConnectionIdNotInConfigList("%s0 Validation error. ConnectionId is missing from the config. Specify the connectionIds from configs."),
1212
EmptyCredentials("%s0 Validation error. Invalid credentials. Credentials must not be empty."),
1313
TableSpecifiedInRequestAndRecordObject("%s0 Validation error. Table name cannot be specified at both the request and record levels. Please specify the table name in only one place."),
14-
UpsertAtRecordLevel("%s0 Validation error. Upsert specify "),
1514
UpsertTableRequestAtRecordLevel("%s0 Validation error. Table name should be present at each record level when upsert is present at record level."),
16-
UpsertTableRequestAtRequestLevel("%S0 Validation error. Upsert should be present at each record level when table name is present at record level."),
15+
UpsertTableRequestAtRequestLevel("%s0 Validation error. Upsert should be present at each record level when table name is present at record level."),
1716
TableNotSpecifiedInRequestAndRecordObject("%s0 Validation error. Table name is missing. Table name should be specified at one place either at the request level or record level. Please specify the table name at one place."),
1817
// Vault config
1918
InvalidVaultId("%s0 Initialization failed. Invalid vault ID. Specify a valid vault ID."),
@@ -64,6 +63,10 @@ public enum ErrorMessage {
6463
TableKeyError("%s0 Validation error. 'table' key is missing from the payload. Specify a 'table' key."),
6564
EmptyTable("%s0 Validation error. 'table' can't be empty. Specify a table."),
6665
ValuesKeyError("%s0 Validation error. 'values' key is missing from the payload. Specify a 'values' key."),
66+
EmptyRecords("%s0 Validation error. 'values' can't be empty. Specify values."),
67+
EmptyKeyInRecords("%s0 Validation error. Invalid key in values. Specify a valid key."),
68+
EmptyValueInRecords("%s0 Validation error. Invalid value in values. Specify a valid value."),
69+
RecordsKeyError("%s0 Validation error. 'values' key is missing from the payload. Specify a 'values' key."),
6770
EmptyValues("%s0 Validation error. 'values' can't be empty. Specify values."),
6871
EmptyKeyInValues("%s0 Validation error. Invalid key in values. Specify a valid key."),
6972
EmptyValueInValues("%s0 Validation error. Invalid value in values. Specify a valid value."),

common/src/main/java/com/skyflow/logs/ErrorLogs.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ public enum ErrorLogs {
5151
EMPTY_TABLE_NAME("Invalid %s1 request. Table name can not be empty."),
5252
VALUES_IS_REQUIRED("Invalid %s1 request. Values are required."),
5353
EMPTY_VALUES("Invalid %s1 request. Values can not be empty."),
54+
RECORDS_IS_REQUIRED("Invalid %s1 request. Records are required."),
55+
EMPTY_RECORDS("Invalid %s1 request. Records can not be empty."),
5456
RECORD_SIZE_EXCEED("Maximum number of records exceeded. The limit is 10000."),
5557
TOKENS_SIZE_EXCEED("Maximum number of tokens exceeded. The limit is 10000."),
5658
EMPTY_OR_NULL_VALUE_IN_VALUES("Invalid %s1 request. Value can not be null or empty in values for key \"%s2\"."),

v3/src/main/java/com/skyflow/VaultClient.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package com.skyflow;
22

3+
import java.util.ArrayList;
4+
import java.util.List;
5+
36
import com.skyflow.config.Credentials;
47
import com.skyflow.config.VaultConfig;
58
import com.skyflow.enums.UpdateType;
@@ -27,9 +30,6 @@
2730
import okhttp3.OkHttpClient;
2831
import okhttp3.Request;
2932

30-
import java.util.ArrayList;
31-
import java.util.List;
32-
3333

3434
public class VaultClient {
3535
private final VaultConfig vaultConfig;
@@ -145,9 +145,9 @@ protected InsertRequest getBulkInsertRequestBody(com.skyflow.vault.data.InsertRe
145145
if (value.getUpsert() != null && !value.getUpsert().isEmpty()){
146146
if (value.getUpsertType() != null) {
147147
EnumUpdateType updateType = null;
148-
if (request.getUpsertType() == UpdateType.REPLACE) {
148+
if (value.getUpsertType() == UpdateType.REPLACE) {
149149
updateType = EnumUpdateType.REPLACE;
150-
} else if (request.getUpsertType() == UpdateType.UPDATE) {
150+
} else if (value.getUpsertType() == UpdateType.UPDATE) {
151151
updateType = EnumUpdateType.UPDATE;
152152
}
153153
Upsert upsert = Upsert.builder().uniqueColumns(value.getUpsert()).updateType(updateType).build();

v3/src/main/java/com/skyflow/enums/UpdateType.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package com.skyflow.enums;
22

3-
import com.fasterxml.jackson.annotation.JsonValue;
4-
53
public enum UpdateType {
64
UPDATE("UPDATE"),
75

v3/src/main/java/com/skyflow/utils/Constants.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ public final class Constants extends BaseConstants {
99
public static final Integer MAX_INSERT_BATCH_SIZE = 1000;
1010
public static final Integer INSERT_CONCURRENCY_LIMIT = 10;
1111
public static final Integer MAX_INSERT_CONCURRENCY_LIMIT = 10;
12-
1312
public static final Integer DETOKENIZE_BATCH_SIZE = 50;
1413
public static final Integer DETOKENIZE_CONCURRENCY_LIMIT = 10;
1514
public static final Integer MAX_DETOKENIZE_BATCH_SIZE = 1000;

v3/src/main/java/com/skyflow/utils/validations/Validations.java

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -28,33 +28,22 @@ public static void validateInsertRequest(InsertRequest insertRequest) throws Sky
2828
ArrayList<InsertRecord> values = insertRequest.getRecords();
2929
if (values == null) {
3030
LogUtil.printErrorLog(Utils.parameterizedString(
31-
ErrorLogs.VALUES_IS_REQUIRED.getLog(), InterfaceName.INSERT.getName()
31+
ErrorLogs.RECORDS_IS_REQUIRED.getLog(), InterfaceName.INSERT.getName()
3232
));
33-
throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.ValuesKeyError.getMessage());
33+
throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.RecordsKeyError.getMessage());
3434
} else if (values.isEmpty()) {
3535
LogUtil.printErrorLog(Utils.parameterizedString(
36-
ErrorLogs.EMPTY_VALUES.getLog(), InterfaceName.INSERT.getName()
36+
ErrorLogs.EMPTY_RECORDS.getLog(), InterfaceName.INSERT.getName()
3737
));
38-
throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.EmptyValues.getMessage());
38+
throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.EmptyRecords.getMessage());
3939
} else if (values.size() > 10000) {
4040
LogUtil.printErrorLog(ErrorLogs.RECORD_SIZE_EXCEED.getLog());
4141
throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.RecordSizeExceedError.getMessage());
4242
}
43-
// if (table == null) {
44-
// LogUtil.printErrorLog(Utils.parameterizedString(
45-
// ErrorLogs.TABLE_IS_REQUIRED.getLog(), InterfaceName.INSERT.getName()
46-
// ));
47-
// throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.TableKeyError.getMessage());
48-
// } else if (table.trim().isEmpty()) {
49-
// LogUtil.printErrorLog(Utils.parameterizedString(
50-
// ErrorLogs.EMPTY_TABLE_NAME.getLog(), InterfaceName.INSERT.getName()
51-
// ));
52-
// throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.EmptyTable.getMessage());
53-
// }
5443
// table check if specified for both
5544
if (insertRequest.getTable() != null && !table.trim().isEmpty()){ // if table name specified at both place
5645
for (InsertRecord valuesMap : values) {
57-
if (valuesMap.getTable() != null || !valuesMap.getTable().trim().isEmpty()){
46+
if (valuesMap.getTable() != null && !valuesMap.getTable().trim().isEmpty()){
5847
LogUtil.printErrorLog(Utils.parameterizedString(
5948
ErrorLogs.TABLE_SPECIFIED_AT_BOTH_PLACE.getLog(), InterfaceName.INSERT.getName()
6049
));
@@ -76,6 +65,12 @@ public static void validateInsertRequest(InsertRequest insertRequest) throws Sky
7665
// upsert check 1
7766
if (insertRequest.getTable() != null && !table.trim().isEmpty()){ // if table name specified at both place
7867
for (InsertRecord valuesMap : values) {
68+
if (valuesMap.getUpsert() != null && valuesMap.getUpsert().isEmpty()) {
69+
LogUtil.printErrorLog(Utils.parameterizedString(
70+
ErrorLogs.EMPTY_UPSERT_VALUES.getLog(), InterfaceName.INSERT.getName()
71+
));
72+
throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.EmptyUpsertValues.getMessage());
73+
}
7974
if (valuesMap.getUpsert() != null && !valuesMap.getUpsert().isEmpty()){
8075
LogUtil.printErrorLog(Utils.parameterizedString(
8176
ErrorLogs.UPSERT_TABLE_REQUEST_AT_RECORD_LEVEL.getLog(), InterfaceName.INSERT.getName()
@@ -94,12 +89,12 @@ public static void validateInsertRequest(InsertRequest insertRequest) throws Sky
9489
}
9590
}
9691

97-
// else if (upsert != null && upsert.isEmpty()) {
98-
// LogUtil.printErrorLog(Utils.parameterizedString(
99-
// ErrorLogs.EMPTY_UPSERT_VALUES.getLog(), InterfaceName.INSERT.getName()
100-
// ));
101-
// throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.EmptyUpsertValues.getMessage());
102-
// }
92+
if (insertRequest.getUpsert() != null && insertRequest.getUpsert().isEmpty()) {
93+
LogUtil.printErrorLog(Utils.parameterizedString(
94+
ErrorLogs.EMPTY_UPSERT_VALUES.getLog(), InterfaceName.INSERT.getName()
95+
));
96+
throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.EmptyUpsertValues.getMessage());
97+
}
10398

10499
for (InsertRecord valuesMap : values) {
105100
if (valuesMap != null ) {

v3/src/main/java/com/skyflow/vault/data/DetokenizeResponse.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,12 @@
99
import java.util.stream.Collectors;
1010

1111
public class DetokenizeResponse {
12-
@Expose(serialize = true)
13-
private List<DetokenizeResponseObject> success;
1412
@Expose(serialize = true)
1513
private DetokenizeSummary summary;
1614

15+
@Expose(serialize = true)
16+
private List<DetokenizeResponseObject> success;
17+
1718
@Expose(serialize = true)
1819
private List<ErrorRecord> errors;
1920

v3/src/main/java/com/skyflow/vault/data/InsertRecord.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.skyflow.vault.data;
22

3+
import com.skyflow.enums.UpdateType;
4+
35
import java.util.List;
46
import java.util.Map;
57

@@ -23,7 +25,7 @@ public List<String> getUpsert() {
2325
return this.builder.upsert;
2426
}
2527

26-
public String getUpsertType() {
28+
public UpdateType getUpsertType() {
2729
return this.builder.upsertType;
2830
}
2931

@@ -32,7 +34,7 @@ public static final class InsertRecordBuilder {
3234
private String table;
3335
private Map<String, Object> data;
3436
private List<String> upsert;
35-
private String upsertType;
37+
private UpdateType upsertType;
3638

3739
public InsertRecordBuilder table(String table) {
3840
this.table = table;
@@ -49,7 +51,7 @@ public InsertRecordBuilder upsert(List<String> upsert) {
4951
return this;
5052
}
5153

52-
public InsertRecordBuilder upsertType(String upsertType) {
54+
public InsertRecordBuilder upsertType(UpdateType upsertType) {
5355
this.upsertType = upsertType;
5456
return this;
5557
}

v3/src/main/java/com/skyflow/vault/data/Success.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public Success(int index, String skyflow_id, Map<String, List<Token>> tokens, Ma
2626
this.skyflow_id = skyflow_id;
2727
this.tokens = tokens;
2828
this.data = data;
29+
this.table = table;
2930
}
3031

3132
public String getSkyflowId() {

v3/test/java/com/skyflow/VaultClientTests.java

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,22 @@
33
import com.skyflow.config.Credentials;
44
import com.skyflow.config.VaultConfig;
55
import com.skyflow.enums.Env;
6+
import com.skyflow.enums.UpdateType;
67
import com.skyflow.errors.ErrorCode;
78
import com.skyflow.errors.SkyflowException;
89
import com.skyflow.generated.rest.resources.recordservice.RecordserviceClient;
10+
import com.skyflow.generated.rest.resources.recordservice.requests.InsertRequest;
11+
import com.skyflow.generated.rest.types.InsertRecordData;
912
import com.skyflow.utils.Constants;
1013
import com.skyflow.utils.SdkVersion;
14+
import com.skyflow.vault.data.InsertRecord;
1115
import io.github.cdimascio.dotenv.Dotenv;
1216
import org.junit.Assert;
1317
import org.junit.BeforeClass;
1418
import org.junit.Test;
1519

20+
import java.util.*;
21+
1622
public class VaultClientTests {
1723
private static final String INVALID_EXCEPTION_THROWN = "Should not have thrown any exception";
1824
private static VaultClient vaultClient;
@@ -151,6 +157,123 @@ public void testPrioritiseCredentialsWithCommonCredentials() throws Exception {
151157
Assert.assertEquals(credentials, getPrivateField(vaultClient, "finalCredentials"));
152158
}
153159

160+
@Test
161+
public void testEmptyRecords() {
162+
com.skyflow.vault.data.InsertRequest request =
163+
com.skyflow.vault.data.InsertRequest.builder().records(new ArrayList<>()).build();
164+
InsertRequest result = vaultClient.getBulkInsertRequestBody(request, vaultConfig);
165+
Assert.assertTrue(result.getRecords().get().isEmpty());
166+
}
167+
168+
@Test
169+
public void testTableAtRequestLevel() {
170+
Map<String, Object> data = new HashMap<>();
171+
data.put("key", "value");
172+
InsertRecord record = InsertRecord.builder().data(data).build();
173+
ArrayList<InsertRecord> records = new ArrayList<>();
174+
records.add(record);
175+
176+
com.skyflow.vault.data.InsertRequest request =
177+
com.skyflow.vault.data.InsertRequest.builder()
178+
.table("table1")
179+
.records(records)
180+
.build();
181+
182+
InsertRequest result = vaultClient.getBulkInsertRequestBody(request, vaultConfig);
183+
Assert.assertEquals("table1", result.getTableName().get());
184+
List<InsertRecordData> recordData = result.getRecords().get();
185+
Assert.assertEquals("value", recordData.get(0).getData().get().get("key"));
186+
}
187+
188+
@Test
189+
public void testTableAtRecordLevel() {
190+
Map<String, Object> data = new HashMap<>();
191+
data.put("key", "value");
192+
InsertRecord record = InsertRecord.builder().data(data).table("table2").build();
193+
194+
ArrayList<InsertRecord> records = new ArrayList<>();
195+
records.add(record);
196+
197+
com.skyflow.vault.data.InsertRequest request =
198+
com.skyflow.vault.data.InsertRequest.builder()
199+
.records(records)
200+
.build();
201+
202+
InsertRequest result = vaultClient.getBulkInsertRequestBody(request, vaultConfig);
203+
Assert.assertEquals("table2", result.getRecords().get().get(0).getTableName().get());
204+
}
205+
206+
@Test
207+
public void testUpsertAtRequestLevel() {
208+
Map<String, Object> data = new HashMap<>();
209+
data.put("key", "value");
210+
InsertRecord record = InsertRecord.builder().data(data).build();
211+
ArrayList<InsertRecord> records = new ArrayList<>();
212+
records.add(record);
213+
214+
215+
List<String> upsertColumns = Arrays.asList("col1");
216+
com.skyflow.vault.data.InsertRequest request =
217+
com.skyflow.vault.data.InsertRequest.builder()
218+
.records(records)
219+
.upsert(upsertColumns)
220+
.upsertType(UpdateType.REPLACE)
221+
.build();
222+
223+
InsertRequest result = vaultClient.getBulkInsertRequestBody(request, vaultConfig);
224+
Assert.assertNotNull(result.getUpsert());
225+
Assert.assertEquals("col1", result.getUpsert().get().getUniqueColumns().get().get(0));
226+
Assert.assertEquals("REPLACE", result.getUpsert().get().getUpdateType().get().name());
227+
}
228+
229+
@Test
230+
public void testUpsertAtRecordLevel() {
231+
Map<String, Object> data = new HashMap<>();
232+
data.put("key", "value");
233+
List<String> upsertColumns = Arrays.asList("col2");
234+
InsertRecord record = InsertRecord.builder().data(data).upsert(upsertColumns).upsertType(UpdateType.UPDATE).build();
235+
System.out.println("record upsert: " + record.getUpsertType());
236+
ArrayList<InsertRecord> records = new ArrayList<>();
237+
records.add(record);
238+
239+
240+
com.skyflow.vault.data.InsertRequest request =
241+
com.skyflow.vault.data.InsertRequest.builder()
242+
.records(records)
243+
.build();
244+
245+
InsertRequest result = vaultClient.getBulkInsertRequestBody(request, vaultConfig);
246+
Assert.assertNotNull(result.getRecords().get().get(0).getUpsert());
247+
System.out.println("result upsert: " + result.getRecords().get().get(0).getUpsert());
248+
Assert.assertEquals("col2", result.getRecords().get().get(0).getUpsert().get().getUniqueColumns().get().get(0));
249+
Assert.assertEquals("UPDATE", result.getRecords().get().get(0).getUpsert().get().getUpdateType().get().name());
250+
}
251+
252+
@Test
253+
public void testMixedTableAndUpsertLevels() {
254+
Map<String, Object> data = new HashMap<>();
255+
data.put("key", "value");
256+
List<String> upsertColumns = Arrays.asList("col3");
257+
InsertRecord record = InsertRecord.builder().data(data).table("table3").upsert(upsertColumns).build();
258+
ArrayList<InsertRecord> records = new ArrayList<>();
259+
records.add(record);
260+
261+
262+
com.skyflow.vault.data.InsertRequest request =
263+
com.skyflow.vault.data.InsertRequest.builder()
264+
.table("table4")
265+
.upsert(Arrays.asList("col4"))
266+
.records(records)
267+
.build();
268+
269+
InsertRequest result = vaultClient.getBulkInsertRequestBody(request, vaultConfig);
270+
Assert.assertEquals("table4", result.getTableName().get());
271+
Assert.assertEquals("table3", result.getRecords().get().get(0).getTableName().get());
272+
Assert.assertEquals("col3", result.getRecords().get().get(0).getUpsert().get().getUniqueColumns().get().get(0));
273+
Assert.assertEquals("col4", result.getUpsert().get().getUniqueColumns().get().get(0));
274+
}
275+
276+
154277
// Helper methods for reflection field access
155278
private Object getPrivateField(Object obj, String fieldName) throws Exception {
156279
java.lang.reflect.Field field = obj.getClass().getDeclaredField(fieldName);

0 commit comments

Comments
 (0)