Skip to content

Commit 86c7d03

Browse files
authored
Merge pull request #245 from Shuffle/0x0elliot/onprem-execution-length-issue
Fix: Unable to Execute Python Code Producing 10 MB Output
2 parents f6dcadb + 4bfb471 commit 86c7d03

File tree

2 files changed

+282
-21
lines changed

2 files changed

+282
-21
lines changed

db-connector.go

Lines changed: 71 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -598,6 +598,17 @@ func SetWorkflowExecution(ctx context.Context, workflowExecution WorkflowExecuti
598598
workflowExecution.Workflow.Comments[actionIndex].Position.Y = float64(0)
599599
}
600600

601+
// Compresses and removes unecessary things
602+
workflowExecution, _ := compressExecution(ctx, workflowExecution, "db-connector save")
603+
604+
executionData, err = json.Marshal(workflowExecution)
605+
if err != nil {
606+
log.Printf("[ERROR] Failed marshalling execution for ES: %s", err)
607+
return err
608+
}
609+
610+
log.Printf("[INFO] Final string size of execution is: %d", len(executionData))
611+
601612
err = indexEs(ctx, nameKey, workflowExecution.ExecutionId, executionData)
602613
if err != nil {
603614
log.Printf("[ERROR] Failed saving new execution %s: %s", workflowExecution.ExecutionId, err)
@@ -1040,9 +1051,6 @@ func GetExecutionVariables(ctx context.Context, executionId string) (string, int
10401051
}
10411052

10421053
func getExecutionFileValue(ctx context.Context, workflowExecution WorkflowExecution, action ActionResult) (string, error) {
1043-
projectName := os.Getenv("SHUFFLE_GCEPROJECT")
1044-
bucketName := project.BucketName
1045-
10461054
fullParsedPath := fmt.Sprintf("large_executions/%s/%s_%s", workflowExecution.ExecutionOrg, workflowExecution.ExecutionId, action.Action.ID)
10471055

10481056
cacheKey := fmt.Sprintf("%s_%s_action_replace", workflowExecution.ExecutionId, action.Action.ID)
@@ -1054,29 +1062,56 @@ func getExecutionFileValue(ctx context.Context, workflowExecution WorkflowExecut
10541062
}
10551063
}
10561064

1057-
bucket := project.StorageClient.Bucket(bucketName)
1058-
obj := bucket.Object(fullParsedPath)
1059-
fileReader, err := obj.NewReader(ctx)
1060-
if err != nil {
1061-
log.Printf("[ERROR] Failed reading file '%s' from bucket %s: %s. Will try with alternative solution.", fullParsedPath, bucketName, err)
1065+
var data []byte
1066+
var err error
10621067

1063-
if projectName != "shuffler" {
1064-
bucketName = fmt.Sprintf("%s.appspot.com", projectName)
1065-
bucket = project.StorageClient.Bucket(bucketName)
1066-
obj = bucket.Object(fullParsedPath)
1067-
fileReader, err = obj.NewReader(ctx)
1068-
if err != nil {
1069-
log.Printf("[ERROR] Failed reading file '%s' again from bucket %s: %s", fullParsedPath, bucketName, err)
1070-
return "", err
1068+
if project.DbType == "opensearch" {
1069+
// On-premise: read from local filesystem
1070+
basepath := os.Getenv("SHUFFLE_FILE_LOCATION")
1071+
if len(basepath) == 0 {
1072+
basepath = "files"
1073+
}
1074+
1075+
localPath := fmt.Sprintf("%s/%s", basepath, fullParsedPath)
1076+
data, err = ioutil.ReadFile(localPath)
1077+
if err != nil {
1078+
// Use DEBUG for file not found (expected on first save), ERROR for other issues
1079+
if os.IsNotExist(err) {
1080+
log.Printf("[DEBUG] File '%s' does not exist yet (expected on first save): %s", localPath, err)
1081+
} else {
1082+
log.Printf("[ERROR] Failed reading file '%s' from local storage: %s", localPath, err)
10711083
}
1072-
} else {
10731084
return "", err
10741085
}
1075-
}
1086+
} else {
1087+
// Cloud: read from bucket
1088+
projectName := os.Getenv("SHUFFLE_GCEPROJECT")
1089+
bucketName := project.BucketName
1090+
1091+
bucket := project.StorageClient.Bucket(bucketName)
1092+
obj := bucket.Object(fullParsedPath)
1093+
fileReader, err := obj.NewReader(ctx)
1094+
if err != nil {
1095+
log.Printf("[ERROR] Failed reading file '%s' from bucket %s: %s. Will try with alternative solution.", fullParsedPath, bucketName, err)
10761096

1077-
data, err := ioutil.ReadAll(fileReader)
1078-
if err != nil {
1079-
return "", err
1097+
if projectName != "shuffler" {
1098+
bucketName = fmt.Sprintf("%s.appspot.com", projectName)
1099+
bucket = project.StorageClient.Bucket(bucketName)
1100+
obj = bucket.Object(fullParsedPath)
1101+
fileReader, err = obj.NewReader(ctx)
1102+
if err != nil {
1103+
log.Printf("[ERROR] Failed reading file '%s' again from bucket %s: %s", fullParsedPath, bucketName, err)
1104+
return "", err
1105+
}
1106+
} else {
1107+
return "", err
1108+
}
1109+
}
1110+
1111+
data, err = ioutil.ReadAll(fileReader)
1112+
if err != nil {
1113+
return "", err
1114+
}
10801115
}
10811116

10821117
if project.CacheDb {
@@ -1651,6 +1686,21 @@ func GetWorkflowExecution(ctx context.Context, id string) (*WorkflowExecution, e
16511686
}
16521687
}
16531688

1689+
if strings.Contains(workflowExecution.Result, "Result too large to handle") {
1690+
baseResult := &ActionResult{
1691+
Result: workflowExecution.Result,
1692+
Action: Action{ID: "execution_result"},
1693+
}
1694+
1695+
newValue, err := getExecutionFileValue(ctx, *workflowExecution, *baseResult)
1696+
if err != nil {
1697+
log.Printf("[DEBUG][%s] Failed to parse in execution file value for Result: %s", workflowExecution.ExecutionId, err)
1698+
} else {
1699+
log.Printf("[DEBUG][%s] Found a new value to parse with Result field", workflowExecution.ExecutionId)
1700+
workflowExecution.Result = newValue
1701+
}
1702+
}
1703+
16541704
for valueIndex, value := range workflowExecution.Results {
16551705
if strings.Contains(value.Result, "Result too large to handle") {
16561706
newValue, err := getExecutionFileValue(ctx, *workflowExecution, value)

shared.go

Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17729,6 +17729,217 @@ func compressExecution(ctx context.Context, workflowExecution WorkflowExecution,
1772917729
}
1773017730
}
1773117731
}
17732+
} else {
17733+
// OpenSearch (on-premise) handling
17734+
// log.Printf("[DEBUG] Result length is %d for execution Id %s, %s", len(tmpJson), workflowExecution.ExecutionId, saveLocationInfo)
17735+
if len(tmpJson) >= 1000000 {
17736+
// Clean up results' actions
17737+
17738+
log.Printf("[DEBUG][%s](%s) ExecutionVariables size: %d, Result size: %d, executionArgument size: %d, Results size: %d", workflowExecution.ExecutionId, saveLocationInfo, len(workflowExecution.ExecutionVariables), len(workflowExecution.Result), len(workflowExecution.ExecutionArgument), len(workflowExecution.Results))
17739+
17740+
dbSave = true
17741+
//log.Printf("[WARNING][%s] Result length is too long (%d) when running %s! Need to reduce result size. Attempting auto-compression by saving data to disk.", workflowExecution.ExecutionId, len(tmpJson), saveLocationInfo)
17742+
actionId := "execution_argument"
17743+
17744+
// Arbitrary reduction size
17745+
maxSize := 50000
17746+
basepath := os.Getenv("SHUFFLE_FILE_LOCATION")
17747+
if len(basepath) == 0 {
17748+
basepath = "files"
17749+
}
17750+
17751+
log.Printf("[DEBUG] Execution Argument length is %d for execution Id %s (%s)", len(workflowExecution.ExecutionArgument), workflowExecution.ExecutionId, saveLocationInfo)
17752+
17753+
if len(workflowExecution.ExecutionArgument) > maxSize {
17754+
itemSize := len(workflowExecution.ExecutionArgument)
17755+
baseResult := fmt.Sprintf(`{
17756+
"success": false,
17757+
"reason": "Result too large to handle (https://github.com/frikky/shuffle/issues/171).",
17758+
"size": %d,
17759+
"extra": "",
17760+
"id": "%s_%s"
17761+
}`, itemSize, workflowExecution.ExecutionId, actionId)
17762+
17763+
log.Printf("[DEBUG] len(executionArgument) is %d for execution Id %s", len(workflowExecution.ExecutionArgument), workflowExecution.ExecutionId)
17764+
17765+
fullParsedPath := fmt.Sprintf("large_executions/%s/%s_%s", workflowExecution.ExecutionOrg, workflowExecution.ExecutionId, actionId)
17766+
localPath := fmt.Sprintf("%s/%s", basepath, fullParsedPath)
17767+
17768+
// Write file to local filesystem
17769+
if err := ioutil.WriteFile(localPath, []byte(workflowExecution.ExecutionArgument), 0644); err != nil {
17770+
// Try creating directory if write fails
17771+
dirPath := fmt.Sprintf("%s/large_executions/%s", basepath, workflowExecution.ExecutionOrg)
17772+
if mkdirErr := os.MkdirAll(dirPath, 0755); mkdirErr != nil {
17773+
log.Printf("[WARNING] Failed creating directory %s: %s (original write error: %s)", dirPath, mkdirErr, err)
17774+
workflowExecution.ExecutionArgument = baseResult
17775+
} else {
17776+
// Retry write after creating directory
17777+
if retryErr := ioutil.WriteFile(localPath, []byte(workflowExecution.ExecutionArgument), 0644); retryErr != nil {
17778+
log.Printf("[WARNING] Failed writing new exec file to local storage after creating directory: %s", retryErr)
17779+
workflowExecution.ExecutionArgument = baseResult
17780+
} else {
17781+
log.Printf("[DEBUG] Saved execution argument to local file %s", localPath)
17782+
workflowExecution.ExecutionArgument = fmt.Sprintf(`{
17783+
"success": false,
17784+
"reason": "Result too large to handle (https://github.com/frikky/shuffle/issues/171).",
17785+
"size": %d,
17786+
"extra": "replace",
17787+
"id": "%s_%s"
17788+
}`, itemSize, workflowExecution.ExecutionId, actionId)
17789+
}
17790+
}
17791+
} else {
17792+
log.Printf("[DEBUG] Saved execution argument to local file %s", localPath)
17793+
workflowExecution.ExecutionArgument = fmt.Sprintf(`{
17794+
"success": false,
17795+
"reason": "Result too large to handle (https://github.com/frikky/shuffle/issues/171).",
17796+
"size": %d,
17797+
"extra": "replace",
17798+
"id": "%s_%s"
17799+
}`, itemSize, workflowExecution.ExecutionId, actionId)
17800+
}
17801+
}
17802+
17803+
newResults := []ActionResult{}
17804+
//shuffle-large-executions
17805+
for _, item := range workflowExecution.Results {
17806+
log.Printf("[DEBUG] Result length is %d for execution Id %s (%s)", len(item.Result), workflowExecution.ExecutionId, saveLocationInfo)
17807+
if len(item.Result) > maxSize {
17808+
log.Printf("[WARNING][%s](%s) result length is larger than maxSize for %s (%d)", workflowExecution.ExecutionId, saveLocationInfo, item.Action.Label, len(item.Result))
17809+
17810+
itemSize := len(item.Result)
17811+
baseResult := fmt.Sprintf(`{
17812+
"success": false,
17813+
"reason": "Result too large to handle (https://github.com/frikky/shuffle/issues/171).",
17814+
"size": %d,
17815+
"extra": "",
17816+
"id": "%s_%s"
17817+
}`, itemSize, workflowExecution.ExecutionId, item.Action.ID)
17818+
17819+
// 1. Get the value and set it instead if it exists
17820+
// 2. If it doesn't exist, add it
17821+
_, err := getExecutionFileValue(ctx, workflowExecution, item)
17822+
if err == nil {
17823+
//log.Printf("[DEBUG][%s] Found execution file locally for '%s'. Not saving another.", workflowExecution.ExecutionId, item.Action.Label)
17824+
} else {
17825+
fullParsedPath := fmt.Sprintf("large_executions/%s/%s_%s", workflowExecution.ExecutionOrg, workflowExecution.ExecutionId, item.Action.ID)
17826+
localPath := fmt.Sprintf("%s/%s", basepath, fullParsedPath)
17827+
17828+
// Try writing file, create directory if needed
17829+
if err := ioutil.WriteFile(localPath, []byte(item.Result), 0644); err != nil {
17830+
// Try creating directory if write fails
17831+
dirPath := fmt.Sprintf("%s/large_executions/%s", basepath, workflowExecution.ExecutionOrg)
17832+
if mkdirErr := os.MkdirAll(dirPath, 0755); mkdirErr != nil {
17833+
log.Printf("[WARNING][%s] Failed creating directory and writing file: %s (original: %s)", workflowExecution.ExecutionId, mkdirErr, err)
17834+
item.Result = baseResult
17835+
newResults = append(newResults, item)
17836+
continue
17837+
}
17838+
17839+
// Retry write after creating directory
17840+
if retryErr := ioutil.WriteFile(localPath, []byte(item.Result), 0644); retryErr != nil {
17841+
log.Printf("[WARNING][%s] Failed writing new exec file to local storage after creating directory: %s", workflowExecution.ExecutionId, retryErr)
17842+
item.Result = baseResult
17843+
newResults = append(newResults, item)
17844+
continue
17845+
}
17846+
}
17847+
17848+
log.Printf("[DEBUG] Saved action result to local file %s", localPath)
17849+
}
17850+
17851+
item.Result = fmt.Sprintf(`{
17852+
"success": false,
17853+
"reason": "Result too large to handle (https://github.com/frikky/shuffle/issues/171).",
17854+
"size": %d,
17855+
"extra": "replace",
17856+
"id": "%s_%s"
17857+
}`, itemSize, workflowExecution.ExecutionId, item.Action.ID)
17858+
}
17859+
17860+
newResults = append(newResults, item)
17861+
// log.Printf("[DEBUG][%s] newResults: %d and item labelled %s length is: %d", workflowExecution.ExecutionId, len(newResults), item.Action.Label, len(item.Result))
17862+
}
17863+
17864+
// log.Printf("[DEBUG][%s](%s) Overwriting executions results now! newResults length: %d", workflowExecution.ExecutionId, saveLocationInfo, len(newResults))
17865+
workflowExecution.Results = newResults
17866+
17867+
// Handle WorkflowExecution.Result field if too large
17868+
if len(workflowExecution.Result) > maxSize {
17869+
log.Printf("[WARNING][%s] Result field is too large (%d bytes), saving to file", workflowExecution.ExecutionId, len(workflowExecution.Result))
17870+
17871+
itemSize := len(workflowExecution.Result)
17872+
actionId := "execution_result"
17873+
baseResult := fmt.Sprintf(`{
17874+
"success": false,
17875+
"reason": "Result too large to handle (https://github.com/frikky/shuffle/issues/171).",
17876+
"size": %d,
17877+
"extra": "",
17878+
"id": "%s_%s"
17879+
}`, itemSize, workflowExecution.ExecutionId, actionId)
17880+
17881+
fullParsedPath := fmt.Sprintf("large_executions/%s/%s_%s", workflowExecution.ExecutionOrg, workflowExecution.ExecutionId, actionId)
17882+
localPath := fmt.Sprintf("%s/%s", basepath, fullParsedPath)
17883+
17884+
// Write file to local filesystem
17885+
if err := ioutil.WriteFile(localPath, []byte(workflowExecution.Result), 0644); err != nil {
17886+
// Try creating directory if write fails
17887+
dirPath := fmt.Sprintf("%s/large_executions/%s", basepath, workflowExecution.ExecutionOrg)
17888+
if mkdirErr := os.MkdirAll(dirPath, 0755); mkdirErr != nil {
17889+
log.Printf("[WARNING] Failed creating directory %s: %s (original write error: %s)", dirPath, mkdirErr, err)
17890+
workflowExecution.Result = baseResult
17891+
} else {
17892+
// Retry write after creating directory
17893+
if retryErr := ioutil.WriteFile(localPath, []byte(workflowExecution.Result), 0644); retryErr != nil {
17894+
log.Printf("[WARNING] Failed writing Result file to local storage after creating directory: %s", retryErr)
17895+
workflowExecution.Result = baseResult
17896+
} else {
17897+
log.Printf("[DEBUG] Saved Result field to local file %s", localPath)
17898+
workflowExecution.Result = fmt.Sprintf(`{
17899+
"success": false,
17900+
"reason": "Result too large to handle (https://github.com/frikky/shuffle/issues/171).",
17901+
"size": %d,
17902+
"extra": "replace",
17903+
"id": "%s_%s"
17904+
}`, itemSize, workflowExecution.ExecutionId, actionId)
17905+
}
17906+
}
17907+
} else {
17908+
log.Printf("[DEBUG] Saved Result field to local file %s", localPath)
17909+
workflowExecution.Result = fmt.Sprintf(`{
17910+
"success": false,
17911+
"reason": "Result too large to handle (https://github.com/frikky/shuffle/issues/171).",
17912+
"size": %d,
17913+
"extra": "replace",
17914+
"id": "%s_%s"
17915+
}`, itemSize, workflowExecution.ExecutionId, actionId)
17916+
}
17917+
}
17918+
}
17919+
17920+
jsonString, err := json.Marshal(workflowExecution)
17921+
if err == nil {
17922+
log.Printf("[DEBUG] Execution size: %d for %s", len(jsonString), workflowExecution.ExecutionId)
17923+
if len(jsonString)> 1000000 {
17924+
log.Printf("[WARNING][%s] Execution size is still too large (%d) when running %s!", workflowExecution.ExecutionId, len(jsonString), saveLocationInfo)
17925+
17926+
for resultIndex, result := range workflowExecution.Results {
17927+
actionData, err := json.Marshal(result.Action)
17928+
if err == nil {
17929+
// log.Printf("[DEBUG] Result Size (%s - action: %d). Value size: %d", result.Action.Label, len(actionData), len(result.Result))
17930+
}
17931+
17932+
if len(actionData) > 10000 {
17933+
for paramIndex, param := range result.Action.Parameters {
17934+
if len(param.Value) > 10000 {
17935+
// log.Printf("[WARNING][%s] Parameter %s in action %s is too large (%d). Removing value.", workflowExecution.ExecutionId, param.Name, result.Action.Label, len(param.Value))
17936+
workflowExecution.Results[resultIndex].Action.Parameters[paramIndex].Value = "Size too large. Removed."
17937+
}
17938+
}
17939+
}
17940+
}
17941+
}
17942+
}
1773217943
}
1773317944
}
1773417945

0 commit comments

Comments
 (0)