Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 152 additions & 65 deletions shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -928,21 +928,21 @@ func HandleGetOrg(resp http.ResponseWriter, request *http.Request) {
// This makes it possible to walk around in the UI for the org

/*
if user.ActiveOrg.Id != org.Id {
log.Printf("[AUDIT] User %s (%s) is admin and has access to org %s. Updating active org to this one.", user.Username, user.Id, org.Id)
user.ActiveOrg.Id = org.Id
user.ActiveOrg.Name = org.Name
user.Role = "admin"
if user.ActiveOrg.Id != org.Id {
log.Printf("[AUDIT] User %s (%s) is admin and has access to org %s. Updating active org to this one.", user.Username, user.Id, org.Id)
user.ActiveOrg.Id = org.Id
user.ActiveOrg.Name = org.Name
user.Role = "admin"

SetUser(ctx, &user, false)
SetUser(ctx, &user, false)

DeleteCache(ctx, fmt.Sprintf("%s_workflows", user.ActiveOrg.Id))
DeleteCache(ctx, fmt.Sprintf("%s_workflows", user.Id))
DeleteCache(ctx, fmt.Sprintf("apps_%s", user.Id))
DeleteCache(ctx, fmt.Sprintf("apps_%s", user.ActiveOrg.Id))
DeleteCache(ctx, fmt.Sprintf("user_%s", user.Username))
DeleteCache(ctx, fmt.Sprintf("user_%s", user.Id))
}
DeleteCache(ctx, fmt.Sprintf("%s_workflows", user.ActiveOrg.Id))
DeleteCache(ctx, fmt.Sprintf("%s_workflows", user.Id))
DeleteCache(ctx, fmt.Sprintf("apps_%s", user.Id))
DeleteCache(ctx, fmt.Sprintf("apps_%s", user.ActiveOrg.Id))
DeleteCache(ctx, fmt.Sprintf("user_%s", user.Username))
DeleteCache(ctx, fmt.Sprintf("user_%s", user.Id))
}
*/

} else {
Expand Down Expand Up @@ -3312,7 +3312,7 @@ func HandleGetEnvironments(resp http.ResponseWriter, request *http.Request) {
if newEnv.Timestamp > 0 && timenow-newEnv.Timestamp > 60 {
newEnvironments[envIndex].RunningIp = ""
newEnvironments[envIndex].Licensed = false
newEnvironments[envIndex].DataLake.Enabled = false
newEnvironments[envIndex].DataLake.Enabled = false
} else {
newEnvironments[envIndex].DataLake = newEnv.DataLake
newEnvironments[envIndex].RunningIp = newEnv.RunningIp
Expand All @@ -3322,7 +3322,7 @@ func HandleGetEnvironments(resp http.ResponseWriter, request *http.Request) {
} else {
newEnvironments[envIndex].RunningIp = ""
newEnvironments[envIndex].Licensed = false
newEnvironments[envIndex].DataLake.Enabled = false
newEnvironments[envIndex].DataLake.Enabled = false
}
}

Expand Down Expand Up @@ -4730,7 +4730,7 @@ func HandleGetTriggers(resp http.ResponseWriter, request *http.Request) {
}

pipelines := []PipelineInfoMini{}
for _, env := range environments {
for _, env := range environments {
if env.Archived {
continue
}
Expand All @@ -4752,12 +4752,12 @@ func HandleGetTriggers(resp http.ResponseWriter, request *http.Request) {
//log.Printf("[DEBUG] Found %d pipelines in %d environments in org %s (%s)", len(pipelines), len(environments), user.ActiveOrg.Name, user.ActiveOrg.Id)

/*
pipelines, err := GetPipelines(ctx, user.ActiveOrg.Id)
if err != nil {
wg.Done()
errChan <- err
return
}
pipelines, err := GetPipelines(ctx, user.ActiveOrg.Id)
if err != nil {
wg.Done()
errChan <- err
return
}
*/
wg.Done()
pipelinesChan <- pipelines
Expand Down Expand Up @@ -4922,26 +4922,26 @@ func HandleGetTriggers(resp http.ResponseWriter, request *http.Request) {
{
// Handled otherwise.
/*
storedPipeline, exist := pipelineMap[trigger.ID]
if exist && storedPipeline.Status != "uninitialized" {
startNode := ""
storedPipeline, exist := pipelineMap[trigger.ID]
if exist && storedPipeline.Status != "uninitialized" {
startNode := ""

storedPipeline.WorkflowId = workflow.ID
storedPipeline.WorkflowId = workflow.ID

if len(workflow.Branches) != 0 {
for _, branch := range workflow.Branches {
if branch.SourceID == trigger.ID {
startNode = branch.DestinationID
if len(workflow.Branches) != 0 {
for _, branch := range workflow.Branches {
if branch.SourceID == trigger.ID {
startNode = branch.DestinationID
}
}
}
}
if startNode == "" {
startNode = workflow.Start
}
storedPipeline.StartNode = startNode
allPipelines = append(allPipelines, storedPipeline)
if startNode == "" {
startNode = workflow.Start
}
storedPipeline.StartNode = startNode
allPipelines = append(allPipelines, storedPipeline)

}
}
*/
}
}
Expand Down Expand Up @@ -8596,7 +8596,6 @@ func SaveWorkflow(resp http.ResponseWriter, request *http.Request) {
}
}


err = SetWorkflow(ctx, workflow, workflow.ID)
if err != nil {
log.Printf("[ERROR] Failed saving workflow to database: %s", err)
Expand Down Expand Up @@ -8642,7 +8641,6 @@ func SaveWorkflow(resp http.ResponseWriter, request *http.Request) {
Name: user.ActiveOrg.Name,
}


SetWorkflowRevision(ctx, workflow)
err = SetGitWorkflow(ctx, workflow, org)
if err != nil {
Expand Down Expand Up @@ -9772,13 +9770,13 @@ func GetSpecificWorkflow(resp http.ResponseWriter, request *http.Request) {
//user.ActiveOrg.Id = workflow.OrgId

workflow = &Workflow{
Name: workflow.Name,
ID: workflow.ID,
Owner: workflow.Owner,
OrgId: workflow.OrgId,
Name: workflow.Name,
ID: workflow.ID,
Owner: workflow.Owner,
OrgId: workflow.OrgId,

OutputYields: workflow.OutputYields,
Sharing: workflow.Sharing,
Sharing: workflow.Sharing,
Description: workflow.Description,
InputQuestions: workflow.InputQuestions,
InputMarkdown: workflow.InputMarkdown,
Expand Down Expand Up @@ -9951,7 +9949,6 @@ func GetSpecificWorkflow(resp http.ResponseWriter, request *http.Request) {
}
}


if workflow.Public {
workflow.BackupConfig = BackupConfig{}
workflow.ExecutingOrg = OrgMini{}
Expand All @@ -9964,7 +9961,7 @@ func GetSpecificWorkflow(resp http.ResponseWriter, request *http.Request) {
}
}

if workflow.BackupConfig.TokensEncrypted {
if workflow.BackupConfig.TokensEncrypted {
parsedKey := fmt.Sprintf("%s_upload_token", workflow.OrgId)
newValue, err := HandleKeyDecryption([]byte(workflow.BackupConfig.UploadToken), parsedKey)
if err != nil {
Expand Down Expand Up @@ -11316,8 +11313,6 @@ func HandleCreateSubOrg(resp http.ResponseWriter, request *http.Request) {
return
}



// Update all admins to have access to this suborg
for _, loopUser := range parentOrg.Users {
if loopUser.Role != "admin" {
Expand Down Expand Up @@ -13298,7 +13293,7 @@ func HandleLogin(resp http.ResponseWriter, request *http.Request) {
userdata.ActiveOrg.Id = innerorg.Id
userdata.ActiveOrg.Name = innerorg.Name
org = innerorg

updateUser = true
break
}
Expand Down Expand Up @@ -22205,7 +22200,7 @@ func PrepareWorkflowExecution(ctx context.Context, workflow Workflow, request *h
err := json.Unmarshal([]byte(workflowExecution.ExecutionArgument), &validMap)
if err != nil {
log.Printf("[ERROR] Failed to unmarshal execution argument: %s", err)
}
}

// Overwriting it either way. Input NEEDS to be valid for map[string]interface{}{}
discoveredUser, err := HandleApiAuthentication(nil, request)
Expand Down Expand Up @@ -23629,11 +23624,104 @@ func CheckNextActions(ctx context.Context, workflowExecution *WorkflowExecution)
}
*/


for index, actionId := range nextActions {
skippedParents := 0
for _, parent := range parents[actionId] {
_, result := GetActionResult(ctx, *workflowExecution, parent)
if result.Status == "SKIPPED" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you verified this works, both with a few skipped and a lot of them, along with some success, some failure etc?

This feels wrong, but I'm not sure why.

skippedParents += 1
}
}
if skippedParents >= len(parents[actionId]) {
if actionId != workflowExecution.Workflow.Start {
for _, action := range workflowExecution.Workflow.Actions {
if actionId != action.ID {
continue
}
foundAction := GetAction(*workflowExecution, actionId, action.Environment)
err := ActionSkip(foundAction, *workflowExecution, parents[actionId])
if err != nil {
log.Printf("ERROR %s", err)
}
copy(nextActions[index:], nextActions[index+1:])
nextActions[len(nextActions)-1] = ""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section is overcomplicated, and I'm worried about indexing issues. Did you test it well, both with a lot of items and 0? This seems like it can crash if there is only 1 item.

I typically make a new array entirely for stuff like this, as index modification is awful. Such as: Why would it be necessary to set a value to "" if you are just about to delete it? That doesn't seem necessary.

nextActions = nextActions[:len(nextActions)-1]
}
}
}
}

return nextActions
}

func ActionSkip(foundAction Action, exec WorkflowExecution, parent []string) error {
newResult := ActionResult{
Action: foundAction,
ExecutionId: exec.ExecutionId,
Authorization: exec.Authorization,
Result: fmt.Sprintf(`{"success": false, "reason": "Skipped because of previous node - %d" - %v}`, len(parent), parent),
StartedAt: 0,
CompletedAt: 0,
Status: "SKIPPED",
}
resultData, err := json.Marshal(newResult)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some space between each section. This is hard to read.

The simplest "rule" for easy readability and space is to have at least one newline at the end of a section, such as:

`if x == y {

}

if x2 == y2 {

}

if err != nil {
log.Printf("[ERROR] Failed skipping action")
return err
}
streamUrl := fmt.Sprintf("http://localhost:5001/api/v1/streams")
if project.Environment == "cloud" {
streamUrl = fmt.Sprintf("https://shuffler.io/api/v1/streams")
if len(os.Getenv("SHUFFLE_GCEPROJECT")) > 0 && len(os.Getenv("SHUFFLE_GCEPROJECT_LOCATION")) > 0 {
streamUrl = fmt.Sprintf("https://%s.%s.r.appspot.com/api/v1/streams", os.Getenv("SHUFFLE_GCEPROJECT"), os.Getenv("SHUFFLE_GCEPROJECT_LOCATION"))
}
if len(os.Getenv("SHUFFLE_CLOUDRUN_URL")) > 0 {
streamUrl = fmt.Sprintf("%s/api/v1/streams", os.Getenv("SHUFFLE_CLOUDRUN_URL"))
}
} else {
if len(os.Getenv("WORKER_HOSTNAME")) > 0 {
streamUrl = fmt.Sprintf("http://%s:33333/api/v1/streams", os.Getenv("WORKER_HOSTNAME"))
}
if os.Getenv("SHUFFLE_OPTIMIZED") == "true" && len(os.Getenv("WORKER_PORT")) > 0 {
streamUrl = fmt.Sprintf("http://localhost:%s/api/v1/streams", os.Getenv("WORKER_PORT"))
} else if os.Getenv("SHUFFLE_SWARM_CONFIG") == "run" && (project.Environment == "" || project.Environment == "worker") {
streamUrl = fmt.Sprintf("http://localhost:33333/api/v1/streams")
} else {
if len(os.Getenv("BASE_URL")) > 0 {
streamUrl = fmt.Sprintf("%s/api/v1/streams", os.Getenv("BASE_URL"))
}
}
}
//log.Printf("[DEBUG] Sending skip for action %s (%s) to URL %s", foundAction.Label, foundAction.AppName, streamUrl)
req, err := http.NewRequest(
"POST",
streamUrl,
bytes.NewBuffer([]byte(resultData)),
)
if err != nil {
log.Printf("[ERROR] Error building SKIPPED request (%s): %s", foundAction.Label, err)
return err
}
client := &http.Client{}
newresp, err := client.Do(req)
if err != nil {
log.Printf("[ERROR] Error running SKIPPED request (%s): %s", foundAction.Label, err)
return err
}
defer newresp.Body.Close()
body, err := ioutil.ReadAll(newresp.Body)
if err != nil {
log.Printf("[ERROR] Failed reading body when running SKIPPED request (%s): %s", foundAction.Label, err)
return err
}
//log.Printf("[DEBUG] Skipped body return from %s (%d): %s", streamUrl, newresp.StatusCode, string(body))
if strings.Contains(string(body), "already finished") {
log.Printf("[WARNING] Data couldn't be re-inputted for %s.", foundAction.Label)
// DONT CHANGE THE ERROR OUTPUT HERE
}
return nil
}

// Decideds what should happen next. Used both for cloud & onprem environments
// Added early 2023 as yet another way to standardize decisionmaking of app executions
func DecideExecution(ctx context.Context, workflowExecution WorkflowExecution, environment string) (WorkflowExecution, []Action) {
Expand All @@ -23654,7 +23742,7 @@ func DecideExecution(ctx context.Context, workflowExecution WorkflowExecution, e
}
}

if len(nextActions) == 0 {
if len(nextActions) == 0 {
nextActions = CheckNextActions(ctx, &workflowExecution)
}

Expand Down Expand Up @@ -29547,7 +29635,7 @@ func HandleUserPrivateTraining(resp http.ResponseWriter, request *http.Request)
}

// An API to ONLY return PUBLIC forms for an org
// A public form = Workflow with "sharing": form
// A public form = Workflow with "sharing": form
func HandleGetOrgForms(resp http.ResponseWriter, request *http.Request) {
cors := HandleCors(resp, request)
if cors {
Expand Down Expand Up @@ -29617,14 +29705,14 @@ func HandleGetOrgForms(resp http.ResponseWriter, request *http.Request) {
randomUser := User{
Id: randomUserId,
ActiveOrg: OrgMini{
Id: orgId,
Id: orgId,
Name: org.Name,
},
}

if validAuth {
if validAuth {
randomUser = user
}
}

workflows, err := GetAllWorkflowsByQuery(ctx, randomUser, 50, "")
if err != nil {
Expand All @@ -29643,7 +29731,7 @@ func HandleGetOrgForms(resp http.ResponseWriter, request *http.Request) {

relevantForms := []Workflow{}
for _, workflow := range workflows {
if validAuth {
if validAuth {
if len(workflow.InputQuestions) == 0 && len(workflow.InputMarkdown) == 0 {
continue
}
Expand All @@ -29660,13 +29748,13 @@ func HandleGetOrgForms(resp http.ResponseWriter, request *http.Request) {

// Overwrite to remove anything unecessary for most locations
workflow = Workflow{
Name: workflow.Name,
ID: workflow.ID,
Owner: workflow.Owner,
OrgId: workflow.OrgId,
Name: workflow.Name,
ID: workflow.ID,
Owner: workflow.Owner,
OrgId: workflow.OrgId,

OutputYields: workflow.OutputYields,
Sharing: workflow.Sharing,
Sharing: workflow.Sharing,
Description: workflow.Description,
InputQuestions: workflow.InputQuestions,
InputMarkdown: workflow.InputMarkdown,
Expand Down Expand Up @@ -29700,7 +29788,7 @@ func HandleGetOrgForms(resp http.ResponseWriter, request *http.Request) {
func SendDeleteWorkflowRequest(childWorkflow Workflow, request *http.Request) error {
log.Printf("[INFO] Attempting to delete child workflow %s", childWorkflow.ID)

// Send a Delete request to the workflows
// Send a Delete request to the workflows
baseUrl := "https://shuffler.io"
if len(os.Getenv("BASE_URL")) > 0 {
baseUrl = os.Getenv("BASE_URL")
Expand All @@ -29724,7 +29812,6 @@ func SendDeleteWorkflowRequest(childWorkflow Workflow, request *http.Request) er
return err
}


// Look for Authorization
for key, values := range request.Header {
if len(values) > 0 {
Expand All @@ -29737,7 +29824,7 @@ func SendDeleteWorkflowRequest(childWorkflow Workflow, request *http.Request) er
req.AddCookie(cookie)
}

// Ensure it points correctly, and that you can only delete the ones you have access to
// Ensure it points correctly, and that you can only delete the ones you have access to
if len(childWorkflow.OrgId) > 0 {
req.Header.Add("Org-Id", childWorkflow.OrgId)
}
Expand Down
Loading