Skip to content

Commit 67bc5c2

Browse files
committed
Many more AI Agent cleanup mechanisms
1 parent 84792aa commit 67bc5c2

File tree

4 files changed

+137
-15
lines changed

4 files changed

+137
-15
lines changed

ai.go

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6675,9 +6675,20 @@ SINGUL ACTIONS:
66756675
oldAgentOutput = mappedResult
66766676
previousAnswers := ""
66776677
relevantDecisions := []AgentDecision{}
6678+
6679+
hasFailure := false
66786680
for _, mappedDecision := range mappedResult.Decisions {
6681+
if mappedDecision.RunDetails.Status == "FAILURE" {
6682+
// Overrides as to get the correct index
6683+
if lastFinishedIndex < mappedDecision.I {
6684+
lastFinishedIndex = mappedDecision.I
6685+
}
6686+
6687+
hasFailure = true
6688+
}
6689+
66796690
if mappedDecision.RunDetails.Status != "FINISHED" && mappedDecision.RunDetails.Status != "SUCCESS" {
6680-
log.Printf("[DEBUG][%s] SKIPPING decision index %d with status %s", execution.ExecutionId, mappedDecision.I, mappedDecision.RunDetails.Status)
6691+
log.Printf("[DEBUG][%s] SKIPPING decision index %d (%s) with status %s", execution.ExecutionId, mappedDecision.I, mappedDecision.RunDetails.Id, mappedDecision.RunDetails.Status)
66816692
continue
66826693
}
66836694

@@ -6705,6 +6716,10 @@ SINGUL ACTIONS:
67056716
break
67066717
}
67076718

6719+
if hasFailure {
6720+
userMessage += "\n\nSome of the previous decisions failed. Finalise the agent.\n\n"
6721+
}
6722+
67086723
userMessage += fmt.Sprintf("\n\nPrevious decision results:\n%s", string(marshalledDecisions))
67096724
if len(previousAnswers) > 0 {
67106725
userMessage += fmt.Sprintf("\n\nAnswers to questions:\n%s", previousAnswers)
@@ -6767,6 +6782,7 @@ RULES:
67676782
* Do NOT add unnecessary fields; only include fields required for the action.
67686783
* Fields can reference previous action outputs using {{action_name}}. Example: {"body": "{{previous_action.field}}"}.
67696784
* If questions are absolutely required, combine all into one "ask" action with multiple "question" fields. Do NOT create multiple separate decisions.
6785+
* If any decision has failed, add the finish decision.
67706786
67716787
END RULES
67726788
---
@@ -7202,7 +7218,19 @@ FINALISING:
72027218
agentOutput.Status = "RUNNING"
72037219
}
72047220

7205-
log.Printf("Got %d NEW decision(s)", len(mappedDecisions))
7221+
if debug {
7222+
log.Printf("[DEBUG] Got %d NEW decision(s)", len(mappedDecisions))
7223+
}
7224+
7225+
// Verbose error handling optimisations
7226+
for _, mappedDecision := range mappedDecisions {
7227+
if mappedDecision.I == lastFinishedIndex && mappedDecision.RunDetails.Status == "FAILURE" {
7228+
if debug {
7229+
log.Printf("\n\n\n\n\nMAPPING TO FAILURE DUE TO DECISION INDEX AND STATUS!!! Decisions that aren't 'finalise' should be ignored\n\n\n\n\n\n\n")
7230+
}
7231+
}
7232+
}
7233+
72067234
additions := 0
72077235
for _, mappedDecision := range mappedDecisions {
72087236
if mappedDecision.I < lastFinishedIndex {
@@ -7257,8 +7285,9 @@ FINALISING:
72577285
}
72587286

72597287
if resultMapping.Status == "FAILURE" {
7260-
agentOutput.Status = "FAILURE"
7261-
agentOutput.CompletedAt = time.Now().Unix()
7288+
log.Printf("\n\n\n\n\nMAPPING TO FAILURE!!!\n\n\nn\n\n\n\n")
7289+
//agentOutput.Status = "FAILURE"
7290+
//agentOutput.CompletedAt = time.Now().Unix()
72627291
}
72637292

72647293
if !createNextActions {
@@ -7315,6 +7344,11 @@ FINALISING:
73157344

73167345

73177346
agentOutput.Output = decision.Reason
7347+
agentOutput.Status = "FINISHED"
7348+
agentOutput.CompletedAt = time.Now().Unix()
7349+
7350+
//workflowExecution.Results[resultIndex].Status = "SUCCESS"
7351+
//go sendAgentActionSelfRequest("SUCCESS", workflowExecution, workflowExecution.Results[resultIndex])
73187352

73197353
} else if decision.Action == "ask" || decision.Action == "question" {
73207354
agentOutput.Decisions[decisionIndex].RunDetails.StartedAt = time.Now().Unix()
@@ -7426,10 +7460,26 @@ FINALISING:
74267460
}
74277461
}
74287462

7463+
7464+
if agentOutput.Status == "FINISHED" && agentOutput.CompletedAt > 0 && execution.Status == "EXECUTING" {
7465+
log.Printf("[INFO][%s] AI Agent action %s finished.", execution.ExecutionId, startNode.ID)
7466+
for resultIndex, result := range execution.Results {
7467+
if result.Action.ID != startNode.ID {
7468+
continue
7469+
}
7470+
7471+
execution.Results[resultIndex].Status = "SUCCESS"
7472+
execution.Results[resultIndex].CompletedAt = agentOutput.CompletedAt
7473+
go sendAgentActionSelfRequest("SUCCESS", execution, execution.Results[resultIndex])
7474+
break
7475+
}
7476+
}
7477+
74297478
} else {
74307479
log.Printf("[ERROR] No result found in AI agent response. Status: %d. Body: %s", newresp.StatusCode, string(body))
74317480
}
74327481

7482+
74337483
if memorizationEngine == "shuffle_db" {
74347484
requestKey := fmt.Sprintf("chat_%s_%s", execution.ExecutionId, startNode.ID)
74357485

cloudSync.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2186,7 +2186,10 @@ func RunAgentDecisionSingulActionHandler(execution WorkflowExecution, decision A
21862186
appId := ""
21872187
_ = appId
21882188
for key, value := range resp.Header {
2189-
log.Printf("\n\n\n\n[DEBUG][%s] HEADER: key: %s, value: %s\n\n\n\n", execution.ExecutionId, key, value)
2189+
//if debug {
2190+
// log.Printf("\n\n\n\n[DEBUG][%s] HEADER: key: %s, value: %s\n\n\n\n", execution.ExecutionId, key, value)
2191+
//}
2192+
21902193
if key == "X-Appname" && len(value) > 0 {
21912194
appname = value[0]
21922195
continue
@@ -2335,7 +2338,7 @@ func RunAgentDecisionAction(execution WorkflowExecution, agentOutput AgentOutput
23352338

23362339
go SetCache(ctx, decisionId, marshalledDecision, 60)
23372340

2338-
if decision.Action == "user_input" || decision.Action == "ask" || decision.Action == "question" || decision.Action == "finish" || decision.Category == "standalone" {
2341+
if decision.Action == "user_input" || decision.Action == "answer" || decision.Action == "ask" || decision.Action == "question" || decision.Action == "finish" || decision.Category == "standalone" {
23392342
} else {
23402343
// Singul handler
23412344
rawResponse, debugUrl, appname, err := RunAgentDecisionSingulActionHandler(execution, decision)

db-connector.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1263,6 +1263,7 @@ func Fixexecution(ctx context.Context, workflowExecution WorkflowExecution) (Wor
12631263
finishedDecisions = append(finishedDecisions, decision.RunDetails.Id)
12641264
continue
12651265
} else if decision.RunDetails.Status == "FAILURE" {
1266+
//finishedDecisions = append(finishedDecisions, decision.RunDetails.Id)
12661267
failedFound = true
12671268
continue
12681269
}
@@ -1285,16 +1286,23 @@ func Fixexecution(ctx context.Context, workflowExecution WorkflowExecution) (Wor
12851286
}
12861287
}
12871288

1289+
// FIXME: Is failure hadnling here necessary?
1290+
// Changed it to do failure handling better in the agent itself
1291+
// due to having a 'finish' action that should handle it properly
12881292
if failedFound {
12891293
decisionsUpdated = true
12901294

1295+
/*
12911296
mappedOutput.Status = "FAILURE"
12921297
mappedOutput.CompletedAt = time.Now().Unix()
12931298
workflowExecution.Results[resultIndex].Status = "ABORTED"
12941299
12951300
go sendAgentActionSelfRequest("FAILURE", workflowExecution, workflowExecution.Results[resultIndex])
1301+
*/
1302+
1303+
}
12961304

1297-
} else if len(finishedDecisions) == len(mappedOutput.Decisions) && mappedOutput.Status != "FINISHED" && mappedOutput.Status != "FAILURE" && mappedOutput.Status != "ABORTED" {
1305+
if len(finishedDecisions) == len(mappedOutput.Decisions) && mappedOutput.Status != "FINISHED" && mappedOutput.Status != "FAILURE" && mappedOutput.Status != "ABORTED" {
12981306
decisionsUpdated = true
12991307
mappedOutput.Status = "FINISHED"
13001308
mappedOutput.CompletedAt = time.Now().Unix()

shared.go

Lines changed: 69 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16226,6 +16226,15 @@ func handleAgentDecisionStreamResult(workflowExecution WorkflowExecution, action
1622616226
if err != nil {
1622716227
log.Printf("[ERROR][%s] Failed unmarshalling agent decision result for decision ID '%s': %s. Data: %s", workflowExecution.ExecutionId, decisionId, err, actionResult.Result)
1622816228
} else {
16229+
if newDecision.Action == "answer" {
16230+
if debug {
16231+
log.Printf("[DEBUG] Auto-finishing 'answer' decision for decision ID '%s'", decisionId)
16232+
}
16233+
16234+
newDecision.RunDetails.Status = "FINISHED"
16235+
mappedResult.Decisions[decisionIdResultIndex] = newDecision
16236+
}
16237+
1622916238
if newDecision.RunDetails.Status != "" && newDecision.RunDetails.Id != "" && (newDecision.RunDetails.Status != mappedResult.Decisions[decisionIdResultIndex].RunDetails.Status || newDecision.RunDetails.StartedAt != mappedResult.Decisions[decisionIdResultIndex].RunDetails.StartedAt || newDecision.RunDetails.CompletedAt != mappedResult.Decisions[decisionIdResultIndex].RunDetails.CompletedAt) {
1623016239

1623116240
log.Printf("[DEBUG][%s] Updating decision ID '%s' with new status '%s' (old: '%s')", workflowExecution.ExecutionId, decisionId, newDecision.RunDetails.Status, mappedResult.Decisions[decisionIdResultIndex].RunDetails.Status)
@@ -16250,9 +16259,12 @@ func handleAgentDecisionStreamResult(workflowExecution WorkflowExecution, action
1625016259
}
1625116260

1625216261
if mappedResult.Decisions[decisionIdResultIndex].RunDetails.Status == "FAILURE" || mappedResult.Decisions[decisionIdResultIndex].RunDetails.Status == "ABORTED" {
16262+
if debug {
16263+
log.Printf("[DEBUG] Auto-failing agent due to decision ID '%s' being in status '%s'", decisionId, mappedResult.Decisions[decisionIdResultIndex].RunDetails.Status)
16264+
}
1625316265

16254-
go sendAgentActionSelfRequest("FAILURE", workflowExecution, workflowExecution.Results[foundActionResultIndex])
16255-
return &workflowExecution, false, nil
16266+
//go sendAgentActionSelfRequest("FAILURE", workflowExecution, workflowExecution.Results[foundActionResultIndex])
16267+
//return &workflowExecution, false, nil
1625616268
}
1625716269

1625816270
//mappedResult.Decisions[decisionIdResultIndex] = actionResult.Result
@@ -16261,6 +16273,12 @@ func handleAgentDecisionStreamResult(workflowExecution WorkflowExecution, action
1626116273
allFinishedDecisions := []string{}
1626216274
for decisionId, curDecision := range mappedResult.Decisions {
1626316275
if curDecision.RunDetails.Status == "FINISHED" {
16276+
allFinishedDecisions = append(allFinishedDecisions, curDecision.RunDetails.Id)
16277+
} else if curDecision.RunDetails.Status == "FAILURE" {
16278+
if debug {
16279+
log.Printf("[DEBUG] Treating decision ID '%s' as finished due to FAILURE status", curDecision.RunDetails.Id)
16280+
}
16281+
1626416282
allFinishedDecisions = append(allFinishedDecisions, curDecision.RunDetails.Id)
1626516283
}
1626616284

@@ -16298,8 +16316,8 @@ func handleAgentDecisionStreamResult(workflowExecution WorkflowExecution, action
1629816316
if len(failedDecisions) > 0 {
1629916317
log.Printf("[WARNING][%s] Failed decision found. Should exit out agent %s. It should have exited before this point.", workflowExecution.ExecutionId, decisionId)
1630016318

16301-
go sendAgentActionSelfRequest("FAILURE", workflowExecution, workflowExecution.Results[foundActionResultIndex])
16302-
break
16319+
//go sendAgentActionSelfRequest("FAILURE", workflowExecution, workflowExecution.Results[foundActionResultIndex])
16320+
//break
1630316321
}
1630416322

1630516323
if len(foundDecisions) == len(finishedDecisions) {
@@ -20639,7 +20657,7 @@ func PrepareSingleAction(ctx context.Context, user User, appId string, body []by
2063920657
// 1. Find the decision & reset cache
2064020658
// 2. Update the execution itself to not have the relevant data
2064120659
if len(decisionId) > 0 {
20642-
log.Printf("[DEBUG][%s] Handling Single action rerun for AI Agent decision. DecisionID: %#v", oldExec.ExecutionId, decisionId)
20660+
log.Printf("[DEBUG][%s] Handling Single action RERUN for AI Agent decision. DecisionID: %#v", oldExec.ExecutionId, decisionId)
2064320661

2064420662
if foundResultIndex == -1 {
2064520663
return workflowExecution, errors.New("Failed to find the action. Please try again or contact [email protected] if this persists.")
@@ -20653,26 +20671,69 @@ func PrepareSingleAction(ctx context.Context, user User, appId string, body []by
2065320671

2065420672
availableDecisions := []string{}
2065520673
foundDecisionIndex := -1
20674+
decisionPosition := -1
20675+
20676+
newDecisions := []AgentDecision{}
2065620677
for decisionIndex, decision := range mappedOutput.Decisions {
2065720678
availableDecisions = append(availableDecisions, decision.RunDetails.Id)
2065820679
if decision.RunDetails.Id != decisionId {
20680+
if decision.Action == "finish" {
20681+
if debug {
20682+
log.Printf("[DEBUG] Removing the 'finish' action due to rerun")
20683+
}
20684+
20685+
continue
20686+
}
20687+
20688+
newDecisions = append(newDecisions, decision)
20689+
2065920690
continue
2066020691
}
2066120692

20662-
foundDecisionIndex = decisionIndex
20663-
20693+
// The position in the hierarchy
20694+
decisionPosition = mappedOutput.Decisions[decisionIndex].I
20695+
foundDecisionIndex = decisionIndex
2066420696
mappedOutput.CompletedAt = 0
2066520697
mappedOutput.Decisions[decisionIndex].RunDetails.Status = "RUNNING"
2066620698
mappedOutput.Decisions[decisionIndex].RunDetails.CompletedAt = 0
2066720699
mappedOutput.Decisions[decisionIndex].RunDetails.RawResponse = ""
2066820700
mappedOutput.Decisions[decisionIndex].RunDetails.DebugUrl = ""
20669-
break
20701+
20702+
newDecisions = append(newDecisions, mappedOutput.Decisions[decisionIndex])
2067020703
}
2067120704

2067220705
if foundDecisionIndex == -1 {
2067320706
return workflowExecution, errors.New(fmt.Sprintf("Failed to find and rerun decision '%s' out of '%s' in execution %s. Please try again or contact [email protected] if the error persists.", decisionId, strings.Join(availableDecisions, ","), oldExec.ExecutionId))
2067420707
}
2067520708

20709+
// Removing everything AFTER the one we are currently on
20710+
// Has to be done in a wonky way due to not having ordered arrays
20711+
newNewDecisions := []AgentDecision{}
20712+
for _, newDecision := range newDecisions {
20713+
if decisionPosition != -1 && newDecision.I > decisionPosition && newDecision.RunDetails.Id != decisionId {
20714+
if debug {
20715+
log.Printf("[DEBUG] SKIPPING decision %s as it's after the rerun position", newDecision.RunDetails.Id)
20716+
}
20717+
20718+
continue
20719+
}
20720+
20721+
if newDecision.RunDetails.Status == "" {
20722+
continue
20723+
}
20724+
20725+
newNewDecisions = append(newNewDecisions, newDecision)
20726+
}
20727+
20728+
20729+
for newDecisionIndex, newDecision := range newNewDecisions {
20730+
if newDecision.RunDetails.Id == decisionId {
20731+
foundDecisionIndex = newDecisionIndex
20732+
}
20733+
}
20734+
20735+
mappedOutput.Decisions = newNewDecisions
20736+
2067620737
mappedOutput.Status = "WAITING"
2067720738
marshalledResult, err := json.Marshal(mappedOutput)
2067820739
if err == nil {

0 commit comments

Comments
 (0)