Skip to content

Commit 8afa9c2

Browse files
committed
Optimised the finaliser for AI Agents
1 parent 8055987 commit 8afa9c2

File tree

3 files changed

+88
-31
lines changed

3 files changed

+88
-31
lines changed

ai.go

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6645,14 +6645,15 @@ SINGUL ACTIONS:
66456645
extraString = ""
66466646
}
66476647

6648+
66486649
// The starting decision number
66496650
lastFinishedIndex := -1
66506651

66516652
oldActionResult := ActionResult{}
66526653
_ = oldActionResult
66536654
oldAgentOutput := AgentOutput{}
66546655
if createNextActions == true {
6655-
extraString = "This is a continuation of a previous execution. ONLY output decisions that fit AFTER the last FINISHED decision. DO NOT repeat previous decisions, and make sure your indexing is on point. Output as an array of decisions.\n\nIF you don't want to add any new decision, add AT LEAST one decision saying why it is finished, summarising EXACTLY what the user wants in a user-friendly Markdown format, OR the format the user asked for. Make the action and category 'finish', and put the reason in the 'reason' field in a user friendly format."
6656+
extraString = "This is a continuation of a previous execution. ONLY output decisions that fit AFTER the last FINISHED decision. DO NOT repeat previous decisions, and make sure your indexing is on point. Output as an array of decisions.\n\nIF you don't want to add any new decision, add AT LEAST one decision saying why it is finished, summarising EXACTLY what the user wants in a user-friendly Markdown format, OR the format the user asked for. Make the action and category 'finish', and put the reason in the 'reason' field. Do NOT summarize or explain. JUST give exactly the final answer and nothing more."
66566657

66576658
userMessageChanged := false
66586659

@@ -6716,6 +6717,10 @@ SINGUL ACTIONS:
67166717
break
67176718
}
67186719

6720+
if len(userMessage) == 0 && len(oldAgentOutput.OriginalInput) > 0 {
6721+
userMessage = fmt.Sprintf("Original input: %s", oldAgentOutput.OriginalInput)
6722+
}
6723+
67196724
if hasFailure {
67206725
userMessage += "\n\nSome of the previous decisions failed. Finalise the agent.\n\n"
67216726
}
@@ -6724,6 +6729,7 @@ SINGUL ACTIONS:
67246729
if len(previousAnswers) > 0 {
67256730
userMessage += fmt.Sprintf("\n\nAnswers to questions:\n%s", previousAnswers)
67266731
}
6732+
67276733
userMessage += "\n\nBased on the previous decisions, find out if any new decisions need to be added."
67286734
userMessageChanged = true
67296735
}
@@ -7283,7 +7289,6 @@ FINALISING:
72837289
}
72847290

72857291
SetWorkflowExecution(ctx, execution, true)
7286-
//os.Exit(3)
72877292
}
72887293
}
72897294

@@ -7301,6 +7306,8 @@ FINALISING:
73017306
// Ensures we track them along the way
73027307
if len(parsedAgentInput) > 0 {
73037308
agentOutput.Input = parsedAgentInput
7309+
7310+
agentOutput.OriginalInput = userMessage
73047311
}
73057312
}
73067313

@@ -7347,16 +7354,30 @@ FINALISING:
73477354

73487355

73497356
agentOutput.Output = decision.Reason
7357+
for _, decisionField := range decision.Fields {
7358+
if (decisionField.Key == "output" || decisionField.Key == "body") && len(decisionField.Value) > 0 {
7359+
agentOutput.Output = decisionField.Value
7360+
}
7361+
}
7362+
73507363
agentOutput.Status = "FINISHED"
73517364
agentOutput.CompletedAt = time.Now().Unix()
73527365

73537366
//workflowExecution.Results[resultIndex].Status = "SUCCESS"
73547367
//go sendAgentActionSelfRequest("SUCCESS", workflowExecution, workflowExecution.Results[resultIndex])
73557368

7369+
//} else if decision.Action == "answer" {
7370+
// agentOutput.Decisions[decisionIndex].RunDetails.StartedAt = time.Now().Unix()
7371+
// agentOutput.Decisions[decisionIndex].RunDetails.CompletedAt = time.Now().Unix()
7372+
// agentOutput.Decisions[decisionIndex].RunDetails.Status = "FINISHED"
7373+
7374+
//go RunAgentDecisionAction(execution, agentOutput, agentOutput.Decisions[decisionIndex])
7375+
73567376
} else if decision.Action == "ask" || decision.Action == "question" {
73577377
agentOutput.Decisions[decisionIndex].RunDetails.StartedAt = time.Now().Unix()
73587378
agentOutput.Decisions[decisionIndex].RunDetails.Status = "RUNNING"
73597379

7380+
73607381
} else if decision.Category != "standalone" {
73617382
// Do we run the singul action directly?
73627383
agentOutput.Decisions[decisionIndex].RunDetails.StartedAt = time.Now().Unix()
@@ -7366,12 +7387,15 @@ FINALISING:
73667387

73677388

73687389
} else {
7369-
if decision.Category == "standalone" {
73707390

7391+
if decision.Category == "standalone" || decision.Action == "answer" {
73717392
// FIXME: Maybe need to send this to myself
7372-
decision.RunDetails.StartedAt = time.Now().Unix()
7373-
decision.RunDetails.CompletedAt = time.Now().Unix()
7374-
decision.RunDetails.Status = "FINISHED"
7393+
7394+
agentOutput.Decisions[decisionIndex].RunDetails.StartedAt = time.Now().Unix()
7395+
agentOutput.Decisions[decisionIndex].RunDetails.CompletedAt = time.Now().Unix()
7396+
agentOutput.Decisions[decisionIndex].RunDetails.Status = "FINISHED"
7397+
7398+
decision = agentOutput.Decisions[decisionIndex]
73757399

73767400
marshalledDecision, err := json.Marshal(decision)
73777401
if err != nil {
@@ -7391,7 +7415,7 @@ FINALISING:
73917415
Result: string(marshalledDecision),
73927416
}
73937417

7394-
// This is required as the result for the agent isn't set yet on the first run
7418+
// This is required as the result for the agent isn't set yet on the first run. Minor delay to wait up a bit
73957419
if decisionIndex == 0 {
73967420
go func() {
73977421
time.Sleep(2 * time.Second)
@@ -7449,7 +7473,7 @@ FINALISING:
74497473
// These aren't properly being updated in the db, so
74507474
// we need additional logic here to ensure it is being
74517475
// set/started
7452-
if nextActionType == "ask" || nextActionType == "finish" {
7476+
if nextActionType == "ask" || nextActionType == "finish" || nextActionType == "answer" {
74537477
// Ensure we update all of it
74547478
for resultIndex, result := range execution.Results {
74557479
if result.Action.ID != startNode.ID {

db-connector.go

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1233,12 +1233,12 @@ func Fixexecution(ctx context.Context, workflowExecution WorkflowExecution) (Wor
12331233
continue
12341234
}
12351235

1236-
if innerresult.Status != "WAITING" {
1236+
if innerresult.Status != "WAITING" && innerresult.Status != "SUCCESS" {
12371237
found = true
12381238
result = innerresult
12391239
break
12401240

1241-
} else if innerresult.Status == "WAITING" && (action.AppName == "AI Agent" || action.AppName == "Shuffle Agent") {
1241+
} else if innerresult.Status == "WAITING" || innerresult.Status == "SUCCESS" && (action.AppName == "AI Agent" || action.AppName == "Shuffle Agent") {
12421242
// Auto fixing decision data based on cache for better decisionmaking
12431243
//log.Printf("[DEBUG] Found action result %s with WAITING status", action.AppName)
12441244

@@ -1253,23 +1253,44 @@ func Fixexecution(ctx context.Context, workflowExecution WorkflowExecution) (Wor
12531253

12541254
finishedDecisions := []string{}
12551255
failedFound := false
1256+
finishDecisionFound := false
12561257
// FIXME: Optimize it to not run too far past "FINISHED" on cache searches
12571258
for decisionIndex, decision := range mappedOutput.Decisions {
1259+
if decision.Action == "finish" {
1260+
finishDecisionFound = true
1261+
}
1262+
12581263
if decision.RunDetails.Status == "NOT IMPLEMENTED" {
12591264
continue
12601265
}
12611266

1267+
decisionId := fmt.Sprintf("agent-%s-%s", workflowExecution.ExecutionId, decision.RunDetails.Id)
12621268
if decision.RunDetails.Status == "FINISHED" {
12631269
finishedDecisions = append(finishedDecisions, decision.RunDetails.Id)
12641270
continue
12651271
} else if decision.RunDetails.Status == "FAILURE" {
12661272
//finishedDecisions = append(finishedDecisions, decision.RunDetails.Id)
12671273
failedFound = true
12681274
continue
1275+
} else {
1276+
if decision.RunDetails.CompletedAt > 0 {
1277+
if debug {
1278+
log.Printf("[DEBUG] Rewriting decision %s to FINISHED based on completed at timestamp.", decision.RunDetails.Id)
1279+
}
1280+
1281+
mappedOutput.Decisions[decisionIndex].RunDetails.Status = "FINISHED"
1282+
finishedDecisions = append(finishedDecisions, decision.RunDetails.Id)
1283+
decisionsUpdated = true
1284+
1285+
marshalledDecision, err := json.Marshal(mappedOutput.Decisions[decisionIndex])
1286+
if err == nil {
1287+
err = SetCache(ctx, decisionId, marshalledDecision, 60)
1288+
}
1289+
continue
1290+
}
12691291
}
12701292

12711293
//log.Printf("[DEBUG] Check cache for %s with status %s", decision.RunDetails.Id, decision.RunDetails.Status)
1272-
decisionId := fmt.Sprintf("agent-%s-%s", workflowExecution.ExecutionId, decision.RunDetails.Id)
12731294
cache, err := GetCache(ctx, decisionId)
12741295
if err == nil {
12751296
foundDecision := AgentDecision{}
@@ -1303,12 +1324,23 @@ func Fixexecution(ctx context.Context, workflowExecution WorkflowExecution) (Wor
13031324
}
13041325

13051326
if len(finishedDecisions) == len(mappedOutput.Decisions) && mappedOutput.Status != "FINISHED" && mappedOutput.Status != "FAILURE" && mappedOutput.Status != "ABORTED" {
1306-
decisionsUpdated = true
1307-
mappedOutput.Status = "FINISHED"
1308-
mappedOutput.CompletedAt = time.Now().Unix()
1309-
workflowExecution.Results[resultIndex].Status = "SUCCESS"
1327+
if finishDecisionFound {
1328+
decisionsUpdated = true
1329+
mappedOutput.Status = "FINISHED"
1330+
mappedOutput.CompletedAt = time.Now().Unix()
1331+
1332+
workflowExecution.Results[resultIndex].Status = "SUCCESS"
1333+
go sendAgentActionSelfRequest("SUCCESS", workflowExecution, workflowExecution.Results[resultIndex])
1334+
} else {
1335+
mappedOutput.Status = "WAITING"
1336+
mappedOutput.CompletedAt = 0
1337+
go sendAgentActionSelfRequest("SUCCESS", workflowExecution, workflowExecution.Results[resultIndex])
1338+
1339+
if debug {
1340+
log.Printf("[DEBUG] All decisions finished but no finish action found.")
1341+
}
1342+
}
13101343

1311-
go sendAgentActionSelfRequest("SUCCESS", workflowExecution, workflowExecution.Results[resultIndex])
13121344
}
13131345

13141346
if decisionsUpdated {

structs.go

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1705,9 +1705,9 @@ type SelectedDetectionRules struct {
17051705
}
17061706

17071707
type DetectionFileInfo struct {
1708-
FileName string `json:"file_name" yaml:"file_name"`
1709-
FileId string `json:"file_id"`
1710-
Tags []string `json:"tags" yaml:"tags"`
1708+
FileName string `json:"file_name" yaml:"file_name"`
1709+
FileId string `json:"file_id"`
1710+
Tags []string `json:"tags" yaml:"tags"`
17111711

17121712
RuleTitle string `json:"title" yaml:"title"`
17131713
Description string `json:"description" yaml:"description"`
@@ -4578,13 +4578,14 @@ type AgentOutput struct {
45784578
// For easy testing
45794579
DecisionString string `json:"decision_string,omitempty" datastore:"decision_string"`
45804580
// For tracking of details parent<->child
4581-
StartedAt int64 `json:"started_at,omitempty" datastore:"started_at"`
4582-
CompletedAt int64 `json:"completed_at,omitempty" datastore:"completed_at"`
4583-
ExecutionId string `json:"execution_id,omitempty" datastore:"execution_id"`
4584-
NodeId string `json:"node_id,omitempty" datastore:"node_id"`
4585-
Memory string `json:"memory,omitempty" datastore:"memory"`
4586-
Input string `json:"input" datastore:"input"`
4587-
Output string `json:"output,omitempty" datastore:"output"`
4581+
StartedAt int64 `json:"started_at,omitempty" datastore:"started_at"`
4582+
CompletedAt int64 `json:"completed_at,omitempty" datastore:"completed_at"`
4583+
ExecutionId string `json:"execution_id,omitempty" datastore:"execution_id"`
4584+
NodeId string `json:"node_id,omitempty" datastore:"node_id"`
4585+
Memory string `json:"memory,omitempty" datastore:"memory"`
4586+
Input string `json:"input" datastore:"input"`
4587+
Output string `json:"output,omitempty" datastore:"output"`
4588+
OriginalInput string `json:"original_input,omitempty" datastore:"original_input"`
45884589
}
45894590

45904591
type HTTPWrapper struct {
@@ -4749,8 +4750,8 @@ type ProcessInfo struct {
47494750
}
47504751

47514752
type UserInfo struct {
4752-
UserID string `json:"user_id"`
4753-
Username string `json:"username"`
4753+
UserID string `json:"user_id"`
4754+
Username string `json:"username"`
47544755
Groups []string `json:"groups,omitempty"`
47554756
}
47564757

@@ -4763,9 +4764,9 @@ type TelemetryConfig struct {
47634764
}
47644765

47654766
type TelemetryFilter struct {
4766-
Type string `json:"type"`
4767-
Include []string `json:"include,omitempty"`
4768-
Exclude []string `json:"exclude,omitempty"`
4767+
Type string `json:"type"`
4768+
Include []string `json:"include,omitempty"`
4769+
Exclude []string `json:"exclude,omitempty"`
47694770
}
47704771

47714772
type AuditLogCollector struct {

0 commit comments

Comments
 (0)