Skip to content

Commit 5796277

Browse files
committed
2 parents 6ad1e2c + 84014ed commit 5796277

File tree

2 files changed

+186
-23
lines changed

2 files changed

+186
-23
lines changed

shared.go

Lines changed: 179 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ import (
1818
"sort"
1919
"sync"
2020

21+
scheduler "cloud.google.com/go/scheduler/apiv1"
22+
"cloud.google.com/go/scheduler/apiv1/schedulerpb"
2123
"gopkg.in/yaml.v3"
2224

2325
//"os/exec"
@@ -4949,6 +4951,33 @@ func HandleGetTriggers(resp http.ResponseWriter, request *http.Request) {
49494951
}
49504952
}
49514953

4954+
if project.Environment == "cloud" {
4955+
var wg sync.WaitGroup
4956+
scheduleMutex := sync.Mutex{}
4957+
4958+
for index, schedule := range allSchedules {
4959+
wg.Add(1)
4960+
go func(index int, schedule ScheduleOld) {
4961+
defer wg.Done()
4962+
4963+
// Check if the schedule exist in the gcp
4964+
GcpSchedule, err := GetGcpSchedule(ctx, schedule.Id)
4965+
4966+
// Use mutex to safely update the schedule status
4967+
scheduleMutex.Lock()
4968+
if err != nil {
4969+
allSchedules[index].Status = "stopped"
4970+
} else {
4971+
allSchedules[index].Status = GcpSchedule.Status
4972+
}
4973+
4974+
scheduleMutex.Unlock()
4975+
}(index, schedule)
4976+
}
4977+
4978+
wg.Wait()
4979+
}
4980+
49524981
sort.SliceStable(allHooks, func(i, j int) bool {
49534982
return allHooks[i].Info.Name < allHooks[j].Info.Name
49544983
})
@@ -4977,6 +5006,65 @@ func HandleGetTriggers(resp http.ResponseWriter, request *http.Request) {
49775006
resp.Write(newjson)
49785007
}
49795008

5009+
func GetGcpSchedule(ctx context.Context, id string) (*ScheduleOld, error) {
5010+
5011+
// Check if we have the schedule in cache
5012+
cacheData, err := GetCache(ctx, fmt.Sprintf("schedule-%s", id))
5013+
if err == nil {
5014+
data, ok := cacheData.([]byte)
5015+
if !ok {
5016+
log.Printf("[ERROR] Cache data for %s is not of type []byte", id)
5017+
} else {
5018+
schedule := &ScheduleOld{}
5019+
err = json.Unmarshal(data, schedule)
5020+
if err != nil {
5021+
log.Printf("[ERROR] Failed to unmarshal schedule cache for %s: %s", id, err)
5022+
} else {
5023+
return schedule, nil
5024+
}
5025+
}
5026+
}
5027+
5028+
schedule := &ScheduleOld{}
5029+
c, err := scheduler.NewCloudSchedulerClient(ctx)
5030+
if err != nil {
5031+
log.Printf("[ERROR] Client error: %s", err)
5032+
return schedule, err
5033+
}
5034+
location := "europe-west2"
5035+
if len(os.Getenv("SHUFFLE_GCEPROJECT")) > 0 && len(os.Getenv("SHUFFLE_GCEPROJECT_LOCATION")) > 0 {
5036+
location = os.Getenv("SHUFFLE_GCEPROJECT_LOCATION")
5037+
}
5038+
req := &schedulerpb.GetJobRequest{
5039+
Name: fmt.Sprintf("projects/%s/locations/%s/jobs/schedule_%s", gceProject, location, id),
5040+
}
5041+
resp, err := c.GetJob(ctx, req)
5042+
if err != nil {
5043+
log.Printf("[ERROR] Failed getting schedule %s: %s", id, err)
5044+
return schedule, err
5045+
}
5046+
schedule.Id = id
5047+
schedule.Name = resp.Name
5048+
if resp.State == schedulerpb.Job_ENABLED {
5049+
schedule.Status = "running"
5050+
} else {
5051+
schedule.Status = "stopped"
5052+
}
5053+
5054+
// Set cache for 5 minutes just to make it fast
5055+
scheduleJSON, err := json.Marshal(schedule)
5056+
if err != nil {
5057+
log.Printf("[ERROR] Failed to marshal schedule for cache: %s", err)
5058+
return schedule, err
5059+
}
5060+
err = SetCache(ctx, fmt.Sprintf("schedule-%s", id), scheduleJSON, 300)
5061+
if err != nil {
5062+
log.Printf("[ERROR] Failed setting cache for schedule %s: %s", id, err)
5063+
}
5064+
5065+
return schedule, nil
5066+
}
5067+
49805068
func HandleGetSchedules(resp http.ResponseWriter, request *http.Request) {
49815069
cors := HandleCors(resp, request)
49825070
if cors {
@@ -6137,7 +6225,7 @@ func diffWorkflowWrapper(parentWorkflow Workflow) Workflow {
61376225
}
61386226

61396227
func subflowPropagationWrapper(parentWorkflow Workflow, childWorkflow Workflow, parentTrigger Trigger) Trigger {
6140-
// remember: when this function is used, the parent trigger is passed to
6228+
// remember: when this function is used, the parent trigger is passed to
61416229
// create the new child trigger.
61426230
trigger := parentTrigger
61436231

@@ -6179,13 +6267,13 @@ func subflowPropagationWrapper(parentWorkflow Workflow, childWorkflow Workflow,
61796267
alreadyPropagatedSubflow := ""
61806268

61816269
for _, workflow := range childOrgWorkflows {
6182-
// this means that the subflow has been propagated to
6270+
// this means that the subflow has been propagated to
61836271
// child workflow already. no need to complicate things further.
61846272
if workflow.ParentWorkflowId == parentSubflowPointedId {
61856273
propagatedEarlier = true
61866274
alreadyPropagatedSubflow = workflow.ID
61876275
break
6188-
}
6276+
}
61896277
}
61906278

61916279
if propagatedEarlier {
@@ -6280,7 +6368,6 @@ func subflowPropagationWrapper(parentWorkflow Workflow, childWorkflow Workflow,
62806368
return trigger
62816369
}
62826370

6283-
62846371
func deleteScheduleGeneral(ctx context.Context, scheduleId string) error {
62856372
schedule, err := GetSchedule(ctx, scheduleId)
62866373
if err != nil {
@@ -6322,7 +6409,7 @@ func deleteScheduleGeneral(ctx context.Context, scheduleId string) error {
63226409
} else if project.Environment == "cloud" && schedule.Environment == "onprem" {
63236410
// hybrid case
63246411
// TODO: to be handled
6325-
} else if project.Environment == "onprem" && (schedule.Environment == "cloud" ) {
6412+
} else if project.Environment == "onprem" && (schedule.Environment == "cloud") {
63266413
scheduleWorkflow, err := GetWorkflow(ctx, schedule.WorkflowId)
63276414
if err != nil {
63286415
log.Printf("[WARNING] Failed getting schedule workflow %s: %s", schedule.WorkflowId, err)
@@ -6363,13 +6450,12 @@ func deleteScheduleGeneral(ctx context.Context, scheduleId string) error {
63636450
return nil
63646451
}
63656452

6366-
63676453
func diffWorkflows(oldWorkflow Workflow, parentWorkflow Workflow, update bool) {
63686454
// Check if there is a difference in actions, and what they are
63696455
// Check if there is a difference in triggers, and what they are
63706456
// Check if there is a difference in branches, and what they are
63716457

6372-
// We create a new ID for each trigger.
6458+
// We create a new ID for each trigger.
63736459
// Older ID is stored in trigger.ReplacementForTrigger
63746460
nameChanged := false
63756461
descriptionChanged := false
@@ -6563,7 +6649,7 @@ func diffWorkflows(oldWorkflow Workflow, parentWorkflow Workflow, update bool) {
65636649
}
65646650
}
65656651

6566-
// checks if parentWorkflow removed a trigger
6652+
// checks if parentWorkflow removed a trigger
65676653
// that was distributed to child workflow.
65686654
for _, oldAction := range oldWorkflow.Triggers {
65696655
if !oldAction.ParentControlled {
@@ -6607,11 +6693,11 @@ func diffWorkflows(oldWorkflow Workflow, parentWorkflow Workflow, update bool) {
66076693
// continue
66086694
// }
66096695

6610-
if oldAction.ReplacementForTrigger != newAction.ID {
6696+
if oldAction.ReplacementForTrigger != newAction.ID {
66116697
continue
66126698
}
66136699

6614-
changeType, changed := hasTriggerChanged(newAction, oldAction)
6700+
changeType, changed := hasTriggerChanged(newAction, oldAction)
66156701
if changed {
66166702
log.Printf("[DEBUG] Trigger %s (%s) has changed in '%s'", newAction.Label, newAction.ID, changeType)
66176703
// updatedTriggers always has parent workflow's new trigger.
@@ -6807,12 +6893,52 @@ func diffWorkflows(oldWorkflow Workflow, parentWorkflow Workflow, update bool) {
68076893
for _, action := range updatedActions {
68086894
for index, childAction := range childWorkflow.Actions {
68096895
if childAction.ID != action.ID {
6896+
// this means it's a new action
68106897
continue
68116898
}
68126899

68136900
// FIXME:
68146901
// Make sure it changes things such as environment & app auth appropriately
68156902

6903+
// implement by default parameter "unlocking"
6904+
// ie: unless action itself is changed,
6905+
// preserve child paramter.
6906+
6907+
// Also, allow a "locking" mechanism that
6908+
// overwrites to child if enabled.
6909+
finalLocks := []ParameterLock{}
6910+
finalParamters := action.Parameters
6911+
6912+
// lock clean up in incoming action
6913+
for _, lock := range action.ParameterLocks {
6914+
if action.Name == lock.ActionName {
6915+
finalLocks = append(finalLocks, lock)
6916+
}
6917+
}
6918+
6919+
action.ParameterLocks = finalLocks
6920+
if len(action.ParameterLocks) == 0 {
6921+
log.Printf("[DEBUG] No locks found for action %s", action.ID)
6922+
if childAction.Name == action.Name {
6923+
log.Printf("[DEBUG] Action %s is the same as child action %s", action.ID, childAction.ID)
6924+
// so, action itself is not changed
6925+
// preserve child parameters by default
6926+
finalParamters = childAction.Parameters
6927+
} else {
6928+
log.Printf("[DEBUG] Action %s (Name: %s, AppID: %s) is different from child action %s (Name: %s, AppID: %s)", action.ID, action.Name, action.AppID, childAction.ID, childAction.Name, childAction.AppID)
6929+
}
6930+
} else {
6931+
for _, lock := range action.ParameterLocks {
6932+
for paramIndex, param := range finalParamters {
6933+
if param.Name == lock.ParameterName && action.Name == lock.ActionName {
6934+
finalParamters[paramIndex].Value = childAction.Parameters[paramIndex].Value
6935+
}
6936+
}
6937+
}
6938+
}
6939+
6940+
action.Parameters = finalParamters
6941+
68166942
childWorkflow.Actions[index] = action
68176943
break
68186944
}
@@ -6883,7 +7009,7 @@ func diffWorkflows(oldWorkflow Workflow, parentWorkflow Workflow, update bool) {
68837009
newChildTriggers := childTriggers
68847010
for _, trigger := range childWorkflow.Triggers {
68857011
if ArrayContains(removedTriggers, trigger.ID) {
6886-
// while removing triggers,
7012+
// while removing triggers,
68877013
// make sure to stop them as well
68887014

68897015
// need to handle this better
@@ -6895,7 +7021,7 @@ func diffWorkflows(oldWorkflow Workflow, parentWorkflow Workflow, update bool) {
68957021
hook, err := GetHook(ctx, trigger.ID)
68967022
if err == nil {
68977023
// this anyhow, means it is a webhook
6898-
err = DeleteKey(ctx, "hooks", hook.Id)
7024+
err = DeleteKey(ctx, "hooks", hook.Id)
68997025
if err != nil {
69007026
log.Printf("[WARNING] Failed deleting hook: %s", err)
69017027
}
@@ -6946,7 +7072,7 @@ func diffWorkflows(oldWorkflow Workflow, parentWorkflow Workflow, update bool) {
69467072
// FIXME:
69477073
// Make sure it changes things such as URL & references properly
69487074
if action.TriggerType == "WEBHOOK" {
6949-
// make sure to only override: name, label, position,
7075+
// make sure to only override: name, label, position,
69507076
// app_version, startnode and nothing else
69517077

69527078
childWorkflow.Triggers[index].Name = action.Name
@@ -6983,12 +7109,12 @@ func diffWorkflows(oldWorkflow Workflow, parentWorkflow Workflow, update bool) {
69837109
childWorkflow.Triggers[index].AppVersion = action.AppVersion
69847110

69857111
// essentially, now we try to verify:
6986-
// okay, new workflow? we see it's a subflow that's
7112+
// okay, new workflow? we see it's a subflow that's
69877113
// what changed? is it the workflow?
69887114

69897115
action = subflowPropagationWrapper(parentWorkflow, childWorkflow, action)
69907116
childWorkflow.Triggers[index].Parameters = action.Parameters
6991-
break
7117+
break
69927118
}
69937119

69947120
childWorkflow.Triggers[index] = action
@@ -10437,6 +10563,38 @@ func GetSpecificWorkflow(resp http.ResponseWriter, request *http.Request) {
1043710563
}
1043810564
}
1043910565

10566+
//Check if workflow trigger schedule is in sync with the gcp cron job
10567+
if workflow.Triggers != nil {
10568+
var wg sync.WaitGroup
10569+
triggerMutex := sync.Mutex{}
10570+
10571+
for index, trigger := range workflow.Triggers {
10572+
if trigger.TriggerType == "SCHEDULE" {
10573+
wg.Add(1)
10574+
go func(index int, trigger Trigger) {
10575+
defer wg.Done()
10576+
10577+
// Check if the schedule is in sync with the gcp cron job
10578+
GcpSchedule, err := GetGcpSchedule(ctx, trigger.ID)
10579+
if err != nil {
10580+
log.Printf("[ERROR] Failed getting gcp schedule for trigger %s: %s", trigger.ID, err)
10581+
10582+
triggerMutex.Lock()
10583+
workflow.Triggers[index].Status = "stopped"
10584+
triggerMutex.Unlock()
10585+
} else {
10586+
triggerMutex.Lock()
10587+
workflow.Triggers[index].Status = GcpSchedule.Status
10588+
triggerMutex.Unlock()
10589+
}
10590+
}(index, trigger)
10591+
}
10592+
}
10593+
10594+
wg.Wait()
10595+
SetWorkflow(ctx, *workflow, workflow.ID)
10596+
}
10597+
1044010598
log.Printf("[INFO] Got new version of workflow %s (%s) for org %s and user %s (%s). Actions: %d, Triggers: %d", workflow.Name, workflow.ID, user.ActiveOrg.Id, user.Username, user.Id, len(workflow.Actions), len(workflow.Triggers))
1044110599

1044210600
body, err := json.Marshal(workflow)
@@ -25686,7 +25844,7 @@ func RunCategoryAction(resp http.ResponseWriter, request *http.Request) {
2568625844

2568725845
//RunAiQuery(systemMessage, userMessage)
2568825846

25689-
partialMatch := true
25847+
partialMatch := true
2569025848
availableLabels := []string{}
2569125849

2569225850
matchName := strings.ReplaceAll(strings.ToLower(strings.TrimSpace(value.AppName)), " ", "_")
@@ -25749,11 +25907,11 @@ func RunCategoryAction(resp http.ResponseWriter, request *http.Request) {
2574925907
log.Printf("[DEBUG] Found app - checking label: %s vs %s (%s)", app.Name, value.AppName, app.ID)
2575025908
//selectedAction, selectedCategory, availableLabels = GetActionFromLabel(ctx, selectedApp, value.Label, true)
2575125909
selectedAction, selectedCategory, availableLabels = GetActionFromLabel(ctx, app, value.Label, true)
25752-
partialMatch = false
25910+
partialMatch = false
2575325911

2575425912
break
2575525913

25756-
// Finds a random match, but doesn't break in case it finds exact
25914+
// Finds a random match, but doesn't break in case it finds exact
2575725915
} else if selectedApp.ID == "" && len(matchName) > 0 && (strings.Contains(appName, matchName) || strings.Contains(matchName, appName)) {
2575825916
selectedApp = app
2575925917

@@ -26410,7 +26568,6 @@ func RunCategoryAction(resp http.ResponseWriter, request *http.Request) {
2641026568

2641126569
client := GetExternalClient(baseUrl)
2641226570

26413-
2641426571
selectedAction.AppName = selectedApp.Name
2641526572
selectedAction.AppID = selectedApp.ID
2641626573
selectedAction.AppVersion = selectedApp.AppVersion
@@ -26789,7 +26946,6 @@ func RunCategoryAction(resp http.ResponseWriter, request *http.Request) {
2678926946
return
2679026947
}
2679126948

26792-
2679326949
// Ensures frontend has something to debug if things go wrong
2679426950
for key, value := range newresp.Header {
2679526951
if strings.HasSuffix(strings.ToLower(key), "-url") {
@@ -26818,7 +26974,7 @@ func RunCategoryAction(resp http.ResponseWriter, request *http.Request) {
2681826974

2681926975
httpOutput, marshalledBody, httpParseErr := FindHttpBody(apprunBody)
2682026976
//log.Printf("\n\nGOT RESPONSE (%d): %s. STATUS: %d\n\n", newresp.StatusCode, string(apprunBody), httpOutput.Status)
26821-
if successStruct.Success == false && len(successStruct.Reason) > 0 && httpOutput.Status == 0 && strings.Contains(strings.ReplaceAll(string(apprunBody), " ", ""), `"success":false`){
26977+
if successStruct.Success == false && len(successStruct.Reason) > 0 && httpOutput.Status == 0 && strings.Contains(strings.ReplaceAll(string(apprunBody), " ", ""), `"success":false`) {
2682226978
log.Printf("[WARNING][AI] Failed running app %s (%s). Contact support. Reason: %s", selectedAction.Name, selectedAction.AppID, successStruct.Reason)
2682326979

2682426980
resp.WriteHeader(400)
@@ -27441,7 +27597,7 @@ func GetActionFromLabel(ctx context.Context, app WorkflowApp, label string, fixL
2744127597
}
2744227598

2744327599
if newLabel == lowercaseLabel {
27444-
exactMatch = true
27600+
exactMatch = true
2744527601
break
2744627602
}
2744727603
}
@@ -27452,7 +27608,7 @@ func GetActionFromLabel(ctx context.Context, app WorkflowApp, label string, fixL
2745227608
}
2745327609
}
2745427610

27455-
// Decides if we are to autocomplete the app if labels are not found
27611+
// Decides if we are to autocomplete the app if labels are not found
2745627612
if len(selectedAction.ID) == 0 {
2745727613
if fixLabels == true {
2745827614
//log.Printf("\n\n[DEBUG] Action not found in app %s (%s) for label '%s'. Autodiscovering and updating the app!!!\n\n", app.Name, app.ID, label)

0 commit comments

Comments
 (0)