diff --git a/skyflow/vaultapi/insert.go b/skyflow/vaultapi/insert.go index c81585dc..b79afa71 100644 --- a/skyflow/vaultapi/insert.go +++ b/skyflow/vaultapi/insert.go @@ -134,60 +134,61 @@ func (insertApi *InsertApi) Post(ctx context.Context, token string) (common.Resp if err != nil { return nil, err } - jsonRecord, _ := json.Marshal(insertApi.Records) - var insertRecord common.InsertRecords - if err := json.Unmarshal(jsonRecord, &insertRecord); err != nil { - logger.Error(fmt.Sprintf(messages.INVALID_RECORDS, insertTag)) - return nil, errors.NewSkyflowError(errors.ErrorCodesEnum(errors.SdkErrorCode), fmt.Sprintf(messages.INVALID_RECORDS, insertTag)) - } + if insertApi.Options.ContinueOnError { + jsonRecord, _ := json.Marshal(insertApi.Records) + var insertRecord common.InsertRecords + if err := json.Unmarshal(jsonRecord, &insertRecord); err != nil { + logger.Error(fmt.Sprintf(messages.INVALID_RECORDS, insertTag)) + return nil, errors.NewSkyflowError(errors.ErrorCodesEnum(errors.SdkErrorCode), fmt.Sprintf(messages.INVALID_RECORDS, insertTag)) + } + record, err := insertApi.constructBatchRequestBody(insertRecord, insertApi.Options) + if err != nil { + return nil, err + } + requestBody, err1 := json.Marshal(record) - record, err := insertApi.constructRequestBody(insertRecord, insertApi.Options) - if err != nil { - return nil, err - } - requestBody, err1 := json.Marshal(record) - if err1 != nil { - logger.Error(fmt.Sprintf(messages.EMPTY_RECORDS, insertTag)) - return nil, errors.NewSkyflowError(errors.ErrorCodesEnum(errors.SdkErrorCode), fmt.Sprintf(messages.UNKNOWN_ERROR, insertTag, err1)) - } - requestUrl := fmt.Sprintf("%s/v1/vaults/%s", insertApi.Configuration.VaultURL, insertApi.Configuration.VaultID) - var request *http.Request - if ctx != nil { - request, _ = http.NewRequestWithContext( - ctx, - "POST", - requestUrl, - strings.NewReader(string(requestBody)), - ) - } else { - request, _ = http.NewRequest( - "POST", - requestUrl, - strings.NewReader(string(requestBody)), - ) - } - bearerToken := fmt.Sprintf("Bearer %s", token) - request.Header.Add("Authorization", bearerToken) - skyMetadata := common.CreateJsonMetadata() - request.Header.Add("sky-metadata", skyMetadata) - logger.Info(fmt.Sprintf(messages.INSERTING_RECORDS, insertTag, insertApi.Configuration.VaultID)) - res, err2 := Client.Do(request) - var requestId = "" - var code = "500" - if res != nil { - requestId = res.Header.Get("x-request-id") - code = strconv.Itoa(res.StatusCode) - } - if err2 != nil { - logger.Error(fmt.Sprintf(messages.SERVER_ERROR, insertTag, common.AppendRequestId(fmt.Sprintf(messages.SERVER_ERROR, insertTag, err2), requestId))) - return nil, errors.NewSkyflowError(errors.ErrorCodesEnum(code), common.AppendRequestId(fmt.Sprintf(messages.SERVER_ERROR, insertTag, err2), requestId)) - } + if err1 != nil { + logger.Error(fmt.Sprintf(messages.EMPTY_RECORDS, insertTag)) + return nil, errors.NewSkyflowError(errors.ErrorCodesEnum(errors.SdkErrorCode), fmt.Sprintf(messages.UNKNOWN_ERROR, insertTag, err1)) + } + requestUrl := fmt.Sprintf("%s/v1/vaults/%s", insertApi.Configuration.VaultURL, insertApi.Configuration.VaultID) - data, _ := ioutil.ReadAll(res.Body) - defer res.Body.Close() - var result map[string]interface{} - err2 = json.Unmarshal(data, &result) - if insertApi.Options.ContinueOnError { + var request *http.Request + if ctx != nil { + request, _ = http.NewRequestWithContext( + ctx, + "POST", + requestUrl, + strings.NewReader(string(requestBody)), + ) + } else { + request, _ = http.NewRequest( + "POST", + requestUrl, + strings.NewReader(string(requestBody)), + ) + } + bearerToken := fmt.Sprintf("Bearer %s", token) + request.Header.Add("Authorization", bearerToken) + skyMetadata := common.CreateJsonMetadata() + request.Header.Add("sky-metadata", skyMetadata) + logger.Info(fmt.Sprintf(messages.INSERTING_RECORDS, insertTag, insertApi.Configuration.VaultID)) + res, err2 := Client.Do(request) + var requestId = "" + var code = "500" + if res != nil { + requestId = res.Header.Get("x-request-id") + code = strconv.Itoa(res.StatusCode) + } + if err2 != nil { + logger.Error(fmt.Sprintf(messages.SERVER_ERROR, insertTag, common.AppendRequestId(fmt.Sprintf(messages.SERVER_ERROR, insertTag, err2), requestId))) + return nil, errors.NewSkyflowError(errors.ErrorCodesEnum(code), common.AppendRequestId(fmt.Sprintf(messages.SERVER_ERROR, insertTag, err2), requestId)) + } + + data, _ := ioutil.ReadAll(res.Body) + defer res.Body.Close() + var result map[string]interface{} + err2 = json.Unmarshal(data, &result) if err2 != nil { logger.Error(fmt.Sprintf(messages.SERVER_ERROR, insertTag, common.AppendRequestId(string(data), requestId))) return nil, errors.NewSkyflowError(errors.ErrorCodesEnum(errors.SdkErrorCode), fmt.Sprintf(messages.UNKNOWN_ERROR, insertTag, common.AppendRequestId(string(data), requestId))) @@ -202,20 +203,144 @@ func (insertApi *InsertApi) Post(ctx context.Context, token string) (common.Resp } return response, nil } else { - if err2 != nil { - logger.Error(fmt.Sprintf(messages.SERVER_ERROR, insertTag, common.AppendRequestId(string(data), requestId))) - return nil, errors.NewSkyflowError(errors.ErrorCodesEnum(errors.SdkErrorCode), fmt.Sprintf(messages.UNKNOWN_ERROR, insertTag, common.AppendRequestId(string(data), requestId))) - } else if result["error"] != nil { - var generatedError = (result["error"]).(map[string]interface{}) - logger.Error(fmt.Sprintf(messages.SERVER_ERROR, insertTag, common.AppendRequestId(generatedError["message"].(string), requestId))) - return nil, errors.NewSkyflowError(errors.ErrorCodesEnum(fmt.Sprintf("%v", generatedError["http_code"])), fmt.Sprintf(messages.SERVER_ERROR, insertTag, common.AppendRequestId(generatedError["message"].(string), requestId))) + records := insertApi.arrangeRecords(insertApi.Records["records"].([]interface{})) + recordssArray := records["RECORDS"].(map[string]interface{}) + var finalSuccess []interface{} + var finalError []map[string]interface{} + responseChannels := make([]chan map[string]interface{}, len(recordssArray)) + + logger.Info(fmt.Sprintf(messages.INSERTING_RECORDS, insertTag, insertApi.Configuration.VaultID)) + i := 0 + for index := range recordssArray { + responseChannel := make(chan map[string]interface{}) + responseChannels[i] = responseChannel + + tableName := index + requestUrl := fmt.Sprintf("%s/v1/vaults/%s/%s", insertApi.Configuration.VaultURL, insertApi.Configuration.VaultID, index) + var UniqueColumn = getUniqueColumn(index, insertApi.Options.Upsert) + insertRecord := recordssArray[index].([]map[string]interface{}) + go func(i int, responseChannel chan<- map[string]interface{}) { + record, err := insertApi.constructBulkRequestBody(insertRecord, insertApi.Options) + if err == nil { + record["upsert"] = UniqueColumn + requestBody, err1 := json.Marshal(record) + if err1 != nil { + logger.Error(fmt.Sprintf(messages.EMPTY_RECORDS, insertTag)) + return + } + var request *http.Request + if ctx != nil { + request, _ = http.NewRequestWithContext( + ctx, + "POST", + requestUrl, + strings.NewReader(string(requestBody)), + ) + } else { + request, _ = http.NewRequest( + "POST", + requestUrl, + strings.NewReader(string(requestBody)), + ) + } + bearerToken := fmt.Sprintf("Bearer %s", token) + request.Header.Add("Authorization", bearerToken) + skyMetadata := common.CreateJsonMetadata() + request.Header.Add("sky-metadata", skyMetadata) + res, err := Client.Do(request) + var requestId = "" + if res != nil { + requestId = res.Header.Get("x-request-id") + } + if err != nil { + logger.Error(fmt.Sprintf(messages.SERVER_ERROR, insertTag, common.AppendRequestId(fmt.Sprintf(messages.SERVER_ERROR, insertTag, err), requestId))) + var error = make(map[string]interface{}) + var errorObj = make(map[string]interface{}) + errorObj["code"] = "500" + errorObj["description"] = common.AppendRequestId(fmt.Sprintf(messages.SERVER_ERROR, insertTag, err), requestId) + error["error"] = errorObj + var errorObject = make(map[string]interface{}) + errorObject = insertApi.addIndexInErrorObject(error, insertRecord) + responseChannel <- errorObject + return + } + data, _ := ioutil.ReadAll(res.Body) + + defer res.Body.Close() + var result map[string]interface{} + err = json.Unmarshal(data, &result) + + if err != nil { + logger.Error(fmt.Sprintf(messages.SERVER_ERROR, insertTag, common.AppendRequestId(string(data), requestId))) + var error = make(map[string]interface{}) + var errorObj = make(map[string]interface{}) + errorObj["code"] = "500" + errorObj["description"] = fmt.Sprintf(messages.UNKNOWN_ERROR, insertTag, common.AppendRequestId(string(data), requestId)) + error["error"] = errorObj + var errorObject = make(map[string]interface{}) + errorObject = insertApi.addIndexInErrorObject(error, insertRecord) + responseChannel <- errorObject + } else { + errorResult := result["error"] + if errorResult != nil { + var generatedError = (errorResult).(map[string]interface{}) + var error = make(map[string]interface{}) + var errorObj = make(map[string]interface{}) + errorObj["code"] = fmt.Sprintf("%v", (errorResult.(map[string]interface{}))["http_code"]) + errorObj["description"] = common.AppendRequestId((generatedError["message"]).(string), requestId) + error["error"] = errorObj + var errorObject = make(map[string]interface{}) + errorObject = insertApi.addIndexInErrorObject(error, insertRecord) + responseChannel <- errorObject + } else { + var record = make(map[string]interface{}) + record = insertApi.buildResponseWithoutContinueOnErr(result, tableName, insertRecord) + delete(record, "valueType") + responseChannel <- record + } + } + } + }(i, responseChannel) + i++ + } + for _, responseChan := range responseChannels { + response := <-responseChan + if _, found := response["errors"]; found { + finalErrorsArray := response["errors"].([]interface{}) + for i := range finalErrorsArray { + finalError = append(finalError, finalErrorsArray[i].(map[string]interface{})) + } + } else { + finalArray := response["records"].([]interface{}) + for i := range finalArray { + finalSuccess = append(finalSuccess, response["records"].([]interface{})[i]) + } + } + } + + var finalRecord = make(map[string]interface{}) + var Partial bool + if len(finalSuccess) != 0 && (len(finalError) != 0) { + Partial = true } - logger.Info(fmt.Sprintf(messages.INSERTING_RECORDS_SUCCESS, insertTag, insertApi.Configuration.VaultID)) - return insertApi.buildResponseWithoutContinueOnErr((result["responses"]).([]interface{}), insertRecord), nil + if Partial { + logger.Error(fmt.Sprintf(messages.PARTIAL_SUCCESS, insertTag)) + } else if len(finalSuccess) == 0 { + logger.Error(fmt.Sprintf(messages.BATCH_INSERT_FAILURE, insertTag)) + } else { + logger.Info(fmt.Sprintf(messages.INSERTING_RECORDS_SUCCESS, insertTag, insertApi.Configuration.VaultID)) + } + + if finalSuccess != nil { + finalRecord["records"] = finalSuccess + } + if finalError != nil { + finalRecord["errors"] = finalError + } + return finalRecord, nil } } - -func (InsertApi *InsertApi) constructRequestBody(record common.InsertRecords, options common.InsertOptions) (map[string]interface{}, *errors.SkyflowError) { +func (InsertApi *InsertApi) constructBatchRequestBody(record common.InsertRecords, options common.InsertOptions) (map[string]interface{}, *errors.SkyflowError) { postPayload := []interface{}{} records := record.Records for _, value := range records { @@ -244,42 +369,104 @@ func (InsertApi *InsertApi) constructRequestBody(record common.InsertRecords, op } return body, nil } +func (InsertApi *InsertApi) constructBulkRequestBody(record []map[string]interface{}, options common.InsertOptions) (map[string]interface{}, *errors.SkyflowError) { + body := make(map[string]interface{}) + body["quorum"] = true + body["records"] = record + body["tokenization"] = options.Tokens + return body, nil +} -func (insertApi *InsertApi) buildResponseWithoutContinueOnErr(responseJson []interface{}, requestRecords common.InsertRecords) common.ResponseBody { +func (InsertApi *InsertApi) arrangeRecords(recordsArray []interface{}) map[string]interface{} { + result := make(map[string]interface{}) + recordGroups := make(map[string]interface{}) // Group by table - var inputRecords = requestRecords.Records + for index, record := range recordsArray { + rec := record.(map[string]interface{}) + table := rec["table"].(string) + + fieldsInterface, fieldsExists := rec["fields"] + tokensInterface, tokensExists := rec["tokens"] + var fields map[string]interface{} + if fieldsExists && fieldsInterface != nil { + fields = fieldsInterface.(map[string]interface{}) + } else { + fields = make(map[string]interface{}) + } + var tokens map[string]interface{} + if tokensExists && tokensInterface != nil { + tokens = tokensInterface.(map[string]interface{}) + } else { + tokens = make(map[string]interface{}) + } + group, exists := recordGroups[table] + if !exists { + group = make([]map[string]interface{}, 0) + } + + // Combine fields and tokens maps + combinedMap := map[string]interface{}{ + "fields": fields, + } + if len(tokens) != 0 { + combinedMap["tokens"] = tokens + } + combinedMap["request_index"] = index + + group = append(group.([]map[string]interface{}), combinedMap) + recordGroups[table] = group + } + + result["RECORDS"] = recordGroups + return result +} + +func (InsertApi *InsertApi) addIndexInErrorObject(error map[string]interface{}, insertRecord []map[string]interface{}) common.ResponseBody { + var errorArray = []interface{}{} + var errorObject = make(map[string]interface{}) + + for i := 0; i < len(insertRecord); i++ { + var singleError = make(map[string]interface{}) + var errorObj = make(map[string]interface{}) + errorObj["code"] = error["error"].(map[string]interface{})["code"] + errorObj["description"] = error["error"].(map[string]interface{})["description"] + errorObj["request_index"] = insertRecord[i]["request_index"] + singleError["error"] = errorObj + errorArray = append(errorArray, singleError) + } + errorObject["errors"] = errorArray + return errorObject +} +func (insertApi *InsertApi) buildResponseWithoutContinueOnErr(responseJson map[string]interface{}, tableName string, insertRecord []map[string]interface{}) common.ResponseBody { var recordsArray = []interface{}{} + var records = responseJson["records"].([]interface{}) var responseObject = make(map[string]interface{}) if insertApi.Options.Tokens { - for i := 0; i < len(responseJson); i = i + 1 { - var mainRecord = responseJson[i].(map[string]interface{}) - var record = mainRecord["records"].([]interface{})[0] - id := record.(map[string]interface{})["skyflow_id"] - tokens := record.(map[string]interface{})["tokens"] + for i := 0; i < len(records); i++ { + index := insertRecord[i]["request_index"] + result := make(map[string]interface{}) + id := records[i].(map[string]interface{})["skyflow_id"] + tokens := records[i].(map[string]interface{})["tokens"] - var inputRecord = inputRecords[i] - records := map[string]interface{}{} var fields = tokens.(map[string]interface{}) fields["skyflow_id"] = id - records["request_index"] = i - records["fields"] = fields - records["table"] = inputRecord.Table - recordsArray = append(recordsArray, records) + result["request_index"] = index + result["fields"] = fields + result["table"] = tableName + recordsArray = append(recordsArray, result) } } else { - for i := 0; i < len(responseJson); i++ { - var inputRecord = inputRecords[i] - var record = ((responseJson[i]).(map[string]interface{})["records"]).([]interface{}) + for i := 0; i < len(records); i++ { + index := insertRecord[i]["request_index"] + var record = records[i] var newRecord = make(map[string]interface{}) - newRecord["request_index"] = i - newRecord["table"] = inputRecord.Table - newRecord["fields"] = record[0] + newRecord["request_index"] = index + newRecord["table"] = tableName + newRecord["fields"] = record recordsArray = append(recordsArray, newRecord) - } } responseObject["records"] = recordsArray - return responseObject } func (insertApi *InsertApi) buildResponseWithContinueOnErr(responseJson []interface{}, requestRecords common.InsertRecords, requestId string) (common.ResponseBody, bool) { diff --git a/skyflow/vaultapi/insert_test.go b/skyflow/vaultapi/insert_test.go index 17c71d61..b7ab4997 100644 --- a/skyflow/vaultapi/insert_test.go +++ b/skyflow/vaultapi/insert_test.go @@ -255,13 +255,6 @@ func TestValidRequestWithTokens(t *testing.T) { records := constructInsertRecordsWithTokens() insertApi := InsertApi{Configuration: configuration, Records: records, Options: common.InsertOptions{Tokens: true}} json := `{ - "Header" : { - "x-request-id": "reqId-123" - }, - "StatusCode": "200", - "vaultID": "123", - "responses": [ - { "records": [ { "skyflow_id": "id1", @@ -276,9 +269,7 @@ func TestValidRequestWithTokens(t *testing.T) { } } ] - } - ] - }` + }` r := ioutil.NopCloser(bytes.NewReader([]byte(json))) mocks.GetDoFunc = func(*http.Request) (*http.Response, error) { return &http.Response{ @@ -343,13 +334,7 @@ func TestValidRequestWithContext(t *testing.T) { var upsertOption = common.UpsertOptions{Table: "table1", Column: "column"} upsertArray = append(upsertArray, upsertOption) insertApi := InsertApi{Configuration: configuration, Records: records, Options: common.InsertOptions{Tokens: true, Upsert: upsertArray}} - json := `{ - "Header" : { - "x-request-id": "reqId-123" - }, - "StatusCode": "200", - "vaultID": "123", - "responses": [ + json := ` { "records": [ { @@ -365,9 +350,7 @@ func TestValidRequestWithContext(t *testing.T) { } } ] - } - ] - }` + }` r := ioutil.NopCloser(bytes.NewReader([]byte(json))) mocks.GetDoFunc = func(*http.Request) (*http.Response, error) { return &http.Response{ @@ -387,27 +370,18 @@ func TestValidRequest(t *testing.T) { upsertArray = append(upsertArray, upsertOption) insertApi := InsertApi{Configuration: configuration, Records: records, Options: common.InsertOptions{Tokens: true, Upsert: upsertArray}} json := `{ - "Header" : { - "x-request-id": "reqId-123" - }, - "StatusCode": "200", - "vaultID": "123", - "responses": [ + "records": [ { - "records": [ - { - "skyflow_id": "id1", - "tokens": { - "first_name": "token1", - "primary_card": { - "*": "id2", - "card_number": "token2", - "cvv": "token3", - "expiry_date": "token4" - } - } + "skyflow_id": "id1", + "tokens": { + "first_name": "token1", + "primary_card": { + "*": "id2", + "card_number": "token2", + "cvv": "token3", + "expiry_date": "token4" } - ] + } } ] }` @@ -426,22 +400,14 @@ func TestValidRequestWithTokensFalse(t *testing.T) { configuration := common.Configuration{VaultID: "123", VaultURL: "https://www.google.com", TokenProvider: GetToken} records := constructInsertRecords() insertApi := InsertApi{Configuration: configuration, Records: records, Options: common.InsertOptions{Tokens: false}} - jsonResp := `{ - "Header" : { - "x-request-id": "reqId-123" - }, - "StatusCode": "200", - "vaultID": "123", - "responses": [ + jsonResp := ` { "records": [ { "skyflow_id": "id1" } ] - } - ] - }` + }` r := ioutil.NopCloser(bytes.NewReader([]byte(jsonResp))) mocks.GetDoFunc = func(*http.Request) (*http.Response, error) { return &http.Response{ @@ -462,7 +428,7 @@ func TestValidRequestWithTokensFalse(t *testing.T) { func TestInsertFailure(t *testing.T) { configuration := common.Configuration{VaultID: "123", VaultURL: "https://www.google.com", TokenProvider: GetToken} records := constructInsertRecords() - insertApi := InsertApi{Configuration: configuration, Records: records, Options: common.InsertOptions{Tokens: false}} + insertApi := InsertApi{Configuration: configuration, Records: records, Options: common.InsertOptions{Tokens: false, ContinueOnError: true}} jsonResp := `{ "Header" : { "x-request-id": "reqId-123" @@ -474,8 +440,8 @@ func TestInsertFailure(t *testing.T) { "http_code": "400", "http_status": "Bad Request", "message": "Object Name cards was not found for Vault 123" - } - }` + } + ` r := ioutil.NopCloser(bytes.NewReader([]byte(jsonResp))) mocks.GetDoFunc = func(*http.Request) (*http.Response, error) { return &http.Response{ @@ -496,7 +462,7 @@ func TestValidRequestWithContinueOnError(t *testing.T) { var upsertOption = common.UpsertOptions{Table: "table1", Column: "column"} upsertArray = append(upsertArray, upsertOption) insertApi := InsertApi{Configuration: configuration, Records: records, Options: common.InsertOptions{Tokens: true, Upsert: upsertArray, ContinueOnError: true}} - json := `{ + jsonStr := `{ "Header" : { "x-request-id": "reqId-123" }, @@ -504,8 +470,8 @@ func TestValidRequestWithContinueOnError(t *testing.T) { "vaultID": "123", "responses": [ { - "Body": { - "records": [ + "Body": { + "records": [ { "skyflow_id": "id1", "tokens": { @@ -519,11 +485,11 @@ func TestValidRequestWithContinueOnError(t *testing.T) { } } ] + } } - } ] }` - r := ioutil.NopCloser(bytes.NewReader([]byte(json))) + r := ioutil.NopCloser(bytes.NewReader([]byte(jsonStr))) mocks.GetDoFunc = func(*http.Request) (*http.Response, error) { return &http.Response{ StatusCode: 200, @@ -591,30 +557,19 @@ func TestBuildResponseWithContinueOnErrorCase(t *testing.T) { upsertArray = append(upsertArray, upsertOption) insertApi := InsertApi{Configuration: configuration, Records: records, Options: common.InsertOptions{Tokens: true, Upsert: upsertArray, ContinueOnError: true}} jsonStr := `{ - "Header" : { - "x-request-id": "reqId-123" - }, - "StatusCode": "200", - "vaultID": "123", - "responses": [ + "records": [ { - "Body": { - "records": [ - { - "skyflow_id": "id1", - "tokens": { - "first_name": "token1", - "primary_card": { - "*": "id2", - "card_number": "token2", - "cvv": "token3", - "expiry_date": "token4" - } - } + "skyflow_id": "id1", + "tokens": { + "first_name": "token1", + "primary_card": { + "*": "id2", + "card_number": "token2", + "cvv": "token3", + "expiry_date": "token4" } - ] + } } - } ] }` var data map[string]interface{} @@ -746,13 +701,6 @@ func TestBuildResponseWithoutContinueOnErrorCase(t *testing.T) { upsertArray = append(upsertArray, upsertOption) insertApi := InsertApi{Configuration: configuration, Records: records, Options: common.InsertOptions{Tokens: true, Upsert: upsertArray, ContinueOnError: false}} jsonStr := `{ - "Header" : { - "x-request-id": "reqId-123" - }, - "StatusCode": "200", - "vaultID": "123", - "responses": [ - { "records": [ { "skyflow_id": "id1", @@ -767,9 +715,7 @@ func TestBuildResponseWithoutContinueOnErrorCase(t *testing.T) { } } ] - } - ] - }` + }` var data map[string]interface{} err := json.Unmarshal([]byte(jsonStr), &data) @@ -798,14 +744,351 @@ func TestBuildResponseWithoutContinueOnErrorCase(t *testing.T) { jsonRecord, _ := json.Marshal(records) var insertRecord common.InsertRecords if err := json.Unmarshal(jsonRecord, &insertRecord); err == nil { - if responses, ok := data["responses"].([]interface{}); ok { - actualResponse := insertApi.buildResponseWithoutContinueOnErr(responses, insertRecord) + if responses, ok := data["responses"].(map[string]interface{}); ok { + actualResponse := insertApi.buildResponseWithoutContinueOnErr(responses, "table", records["records"].([]map[string]interface{})) expectedJSON, _ := json.Marshal(expectedResponse) actualJSON, _ := json.Marshal(actualResponse) check(string(expectedJSON), string(actualJSON), t) } } +} +func TestArrangeRecords(t *testing.T) { + configuration := common.Configuration{VaultID: "123", VaultURL: "https://www.google.com", TokenProvider: GetToken} + records := constructInsertRecords() + var upsertArray []common.UpsertOptions + var upsertOption = common.UpsertOptions{Table: "table1", Column: "column"} + upsertArray = append(upsertArray, upsertOption) + insertApi := InsertApi{Configuration: configuration, Records: records, Options: common.InsertOptions{Tokens: true, Upsert: upsertArray, ContinueOnError: false}} + testCase := struct { + recordsArray []interface{} + expected map[string]interface{} + }{ + recordsArray: []interface{}{ + map[string]interface{}{ + "table": "credit_card", + "fields": map[string]interface{}{ + "card_number": "4111111111111142", + }, + "tokens": map[string]interface{}{ + "card_number": "9991-3466-6577-4760", + }, + }, + map[string]interface{}{ + "table": "credit_cards", + "fields": map[string]interface{}{ + "card_number": "4111011111111114", + }, + }, + map[string]interface{}{ + "table": "credit_cardss", + "fields": map[string]interface{}{ + "cvv": "1234", + }, + }, + map[string]interface{}{ + "table": "table3", + "fields": map[string]interface{}{ + "card_pin": "3888", + "card_number": "4101111111111164", + }, + }, + map[string]interface{}{ + "table": "credit_cards", + "fields": map[string]interface{}{ + "card_number": "4111011111111114", + }, + }, + }, + expected: map[string]interface{}{ + "RECORDS": map[string]interface{}{ + "credit_card": []interface{}{ + map[string]interface{}{ + "fields": map[string]interface{}{ + "card_number": "4111111111111142", + }, + "tokens": map[string]interface{}{ + "card_number": "9991-3466-6577-4760", + }, + "request_index": 0, + }, + }, + "credit_cards": []interface{}{ + map[string]interface{}{ + "fields": map[string]interface{}{ + "card_number": "4111011111111114", + }, + "request_index": 1, + }, + map[string]interface{}{ + "fields": map[string]interface{}{ + "card_number": "4111011111111114", + }, + "request_index": 4, + }, + }, + "credit_cardss": []interface{}{ + map[string]interface{}{ + "fields": map[string]interface{}{ + "cvv": "1234", + }, + "request_index": 2, + }, + }, + "table3": []interface{}{ + map[string]interface{}{ + "fields": map[string]interface{}{ + "card_pin": "3888", + "card_number": "4101111111111164", + }, + "request_index": 3, + }, + }, + }, + }, + } + + result := insertApi.arrangeRecords(testCase.recordsArray) + + expectedJSON, _ := json.Marshal(testCase.expected) + actualJSON, _ := json.Marshal(result) + check(string(expectedJSON), string(actualJSON), t) + +} +func TestBuildResponseWithoutContinueOnErrWithTokensAsFalse(t *testing.T) { + + // Test case : Records without tokens + testCase := struct { + responseJson map[string]interface{} + tableName string + insertRecord []map[string]interface{} + expected common.ResponseBody + }{ + responseJson: map[string]interface{}{ + "records": []interface{}{ + map[string]interface{}{ + "skyflow_id": "id1", + }, + map[string]interface{}{ + "skyflow_id": "id2", + }, + }, + }, + tableName: "credit_cards", + insertRecord: []map[string]interface{}{ + { + "fields": map[string]interface{}{"card_number": "4111011111111114"}, + "request_index": 1, + }, + { + "fields": map[string]interface{}{"card_number": "4111011111111114"}, + "request_index": 4, + }, + }, + expected: common.ResponseBody{ + "records": []interface{}{ + map[string]interface{}{ + "request_index": 1, + "fields": map[string]interface{}{ + "skyflow_id": "id1", + }, + "table": "credit_cards", + }, + map[string]interface{}{ + "request_index": 4, + "fields": map[string]interface{}{ + "skyflow_id": "id2", + }, + "table": "credit_cards", + }, + }, + }, + } + insertApi := &InsertApi{ + Options: common.InsertOptions{ + Tokens: false, + ContinueOnError: false, + }, + } + + result := insertApi.buildResponseWithoutContinueOnErr(testCase.responseJson, testCase.tableName, testCase.insertRecord) + + expectedJSON, _ := json.Marshal(testCase.expected) + actualJSON, _ := json.Marshal(result) + check(string(expectedJSON), string(actualJSON), t) + +} +func TestBuildResponseWithoutContinueOnErr(t *testing.T) { + // Test case : Records with tokens + testCase1 := struct { + responseJson map[string]interface{} + tableName string + insertRecord []map[string]interface{} + expected common.ResponseBody + }{ + responseJson: map[string]interface{}{ + "records": []interface{}{ + map[string]interface{}{ + "skyflow_id": "id1", + "tokens": map[string]interface{}{ + "card_number": "token1", + }, + }, + map[string]interface{}{ + "skyflow_id": "id2", + "tokens": map[string]interface{}{ + "card_number": "token2", + }, + }, + }, + }, + tableName: "credit_cards", + insertRecord: []map[string]interface{}{ + { + "fields": map[string]interface{}{"card_number": "4111011111111114"}, + "request_index": 1, + }, + { + "fields": map[string]interface{}{"card_number": "4111011111111114"}, + "request_index": 4, + }, + }, + expected: common.ResponseBody{ + "records": []interface{}{ + map[string]interface{}{ + "request_index": 1, + "fields": map[string]interface{}{ + "card_number": "token1", + "skyflow_id": "id1", + }, + "table": "credit_cards", + }, + map[string]interface{}{ + "request_index": 4, + "fields": map[string]interface{}{ + "card_number": "token2", + "skyflow_id": "id2", + }, + "table": "credit_cards", + }, + }, + }, + } + + runTestCase(t, testCase1) +} + +func runTestCase(t *testing.T, testCase struct { + responseJson map[string]interface{} + tableName string + insertRecord []map[string]interface{} + expected common.ResponseBody +}) { + insertApi := &InsertApi{ + Options: common.InsertOptions{ + Tokens: true, + ContinueOnError: false, + }, + } + + result := insertApi.buildResponseWithoutContinueOnErr(testCase.responseJson, testCase.tableName, testCase.insertRecord) + + expectedJSON, _ := json.Marshal(testCase.expected) + actualJSON, _ := json.Marshal(result) + check(string(expectedJSON), string(actualJSON), t) +} + +func TestAddIndexInErrorObject(t *testing.T) { + // Test case 1: Error object with multiple errors + testCase1 := struct { + error map[string]interface{} + insertRecord []map[string]interface{} + expected common.ResponseBody + }{ + error: map[string]interface{}{ + "error": map[string]interface{}{ + "code": 400, + "description": "Object Name credit_cardss was not found for Vault s41b985164cf4145bbfc2a136f968186 - requestId : a3b8fd62-8d61-9b16-9285-837f80c9c625", + }, + }, + insertRecord: []map[string]interface{}{ + { + "fields": map[string]interface{}{"card_number": "4111011111111114"}, + "request_index": 1, + }, + { + "fields": map[string]interface{}{"card_number": "4111011111111114"}, + "request_index": 4, + }, + }, + expected: common.ResponseBody{ + "errors": []interface{}{ + map[string]interface{}{ + "error": map[string]interface{}{ + "code": 400, + "description": "Object Name credit_cardss was not found for Vault s41b985164cf4145bbfc2a136f968186 - requestId : a3b8fd62-8d61-9b16-9285-837f80c9c625", + "request_index": 1, + }, + }, + map[string]interface{}{ + "error": map[string]interface{}{ + "code": 400, + "description": "Object Name credit_cardss was not found for Vault s41b985164cf4145bbfc2a136f968186 - requestId : a3b8fd62-8d61-9b16-9285-837f80c9c625", + "request_index": 4, + }, + }, + }, + }, + } + + // Test case 2: Error object with a single error + testCase2 := struct { + error map[string]interface{} + insertRecord []map[string]interface{} + expected common.ResponseBody + }{ + error: map[string]interface{}{ + "error": map[string]interface{}{ + "code": 404, + "description": "Object Name user_not_found was not found for Vault s41b985164cf4145bbfc2a136f968186 - requestId : a3b8fd62-8d61-9b16-9285-837f80c9c625", + }, + }, + insertRecord: []map[string]interface{}{ + { + "fields": map[string]interface{}{"card_number": "4111011111111114"}, + "request_index": 1, + }, + }, + expected: common.ResponseBody{ + "errors": []interface{}{ + map[string]interface{}{ + "error": map[string]interface{}{ + "code": 404, + "description": "Object Name user_not_found was not found for Vault s41b985164cf4145bbfc2a136f968186 - requestId : a3b8fd62-8d61-9b16-9285-837f80c9c625", + "request_index": 1, + }, + }, + }, + }, + } + + // Run the test cases + runTestCase2(t, testCase1) + runTestCase2(t, testCase2) +} + +func runTestCase2(t *testing.T, testCase struct { + error map[string]interface{} + insertRecord []map[string]interface{} + expected common.ResponseBody +}) { + insertApi := &InsertApi{} + + result := insertApi.addIndexInErrorObject(testCase.error, testCase.insertRecord) + + if !reflect.DeepEqual(result, testCase.expected) { + t.Errorf("AddIndexInErrorObject result does not match the expected output.\nExpected: %v\nActual: %v", testCase.expected, result) + } } func constructInsertRecords() map[string]interface{} { records := make(map[string]interface{})