Skip to content

Commit 8c4229b

Browse files
committed
refactor: chatbot listener
1 parent ef41e95 commit 8c4229b

File tree

4 files changed

+249
-254
lines changed

4 files changed

+249
-254
lines changed
Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
import { XmtpMessageOutput } from '@app/definitions/integration-definitions/xmtp/xmtp.common'
2+
import { BadRequestException, Body, Controller, Logger, Post, Req, UnauthorizedException } from '@nestjs/common'
3+
import { Request } from 'express'
4+
import { uniq } from 'lodash'
5+
import { ObjectId } from 'mongodb'
6+
import { Types } from 'mongoose'
7+
import { RunnerService } from '../../../../runner/src/services/runner.service'
8+
import { ContactService } from '../../contacts/services/contact.service'
9+
import { IntegrationTrigger } from '../../integration-triggers/entities/integration-trigger'
10+
import { IntegrationTriggerService } from '../../integration-triggers/services/integration-trigger.service'
11+
import { Integration } from '../../integrations/entities/integration'
12+
import { IntegrationService } from '../../integrations/services/integration.service'
13+
import { WorkflowActionService } from '../../workflow-actions/services/workflow-action.service'
14+
import { WorkflowRunStatus } from '../../workflow-runs/entities/workflow-run-status'
15+
import { WorkflowSleep } from '../../workflow-runs/entities/workflow-sleep'
16+
import { WorkflowRunService } from '../../workflow-runs/services/workflow-run.service'
17+
import { WorkflowSleepService } from '../../workflow-runs/services/workflow-sleep.service'
18+
import { Workflow } from '../../workflows/entities/workflow'
19+
import { WorkflowService } from '../../workflows/services/workflow.service'
20+
import { WorkflowTrigger } from '../entities/workflow-trigger'
21+
import { WorkflowTriggerService } from '../services/workflow-trigger.service'
22+
import { WorkflowUsedIdService } from '../services/workflow-used-id.service'
23+
24+
@Controller('/chatbots')
25+
export class ChatbotController {
26+
private readonly logger = new Logger(ChatbotController.name)
27+
28+
private chatbotIntegration: Integration
29+
private chatbotIntegrationTrigger: IntegrationTrigger
30+
private xmtpIntegration: Integration
31+
private xmtpIntegrationTrigger: IntegrationTrigger
32+
33+
constructor(
34+
private readonly integrationService: IntegrationService,
35+
private readonly integrationTriggerService: IntegrationTriggerService,
36+
private readonly workflowService: WorkflowService,
37+
private readonly workflowTriggerService: WorkflowTriggerService,
38+
private readonly workflowActionService: WorkflowActionService,
39+
private readonly workflowRunService: WorkflowRunService,
40+
private readonly runnerService: RunnerService,
41+
private workflowUsedIdService: WorkflowUsedIdService,
42+
private workflowSleepService: WorkflowSleepService,
43+
private contactService: ContactService,
44+
) {}
45+
46+
async onModuleInit() {
47+
this.chatbotIntegration = (await this.integrationService.findOne({ key: 'chatbot' })) as Integration
48+
this.chatbotIntegrationTrigger = (await this.integrationTriggerService.findOne({
49+
integration: this.chatbotIntegration._id,
50+
key: 'newChatbotMessage',
51+
})) as IntegrationTrigger
52+
53+
this.xmtpIntegration = (await this.integrationService.findOne({ key: 'xmtp' })) as Integration
54+
this.xmtpIntegrationTrigger = (await this.integrationTriggerService.findOne({
55+
integration: this.xmtpIntegration._id,
56+
key: 'newMessage',
57+
})) as IntegrationTrigger
58+
}
59+
60+
@Post('/')
61+
async received(@Body() body: Record<string, any>, @Req() req: Request) {
62+
if (req.headers?.authorization !== process.env.CHATBOT_SECRET) {
63+
throw new UnauthorizedException()
64+
}
65+
if (!body.user || !body.message) {
66+
throw new BadRequestException()
67+
}
68+
69+
const chatbotWorkflowTriggers = await this.workflowTriggerService.find({
70+
owner: new ObjectId(body.user),
71+
integrationTrigger: this.chatbotIntegrationTrigger._id,
72+
enabled: true,
73+
planLimited: { $ne: true },
74+
})
75+
const chatbotPromises = chatbotWorkflowTriggers.map(async (workflowTrigger) =>
76+
this.processChatbotMessage(body.message, workflowTrigger),
77+
)
78+
await Promise.all(chatbotPromises)
79+
80+
const xmtpWorkflowTriggers = await this.workflowTriggerService.find({
81+
owner: new ObjectId(body.user),
82+
integrationTrigger: this.xmtpIntegrationTrigger._id,
83+
enabled: true,
84+
planLimited: { $ne: true },
85+
})
86+
const xmtpPromises = xmtpWorkflowTriggers.map(async (workflowTrigger) =>
87+
this.processXmtpMessage(body.message, workflowTrigger),
88+
)
89+
await Promise.all(xmtpPromises)
90+
91+
return { ok: true }
92+
}
93+
94+
async processChatbotMessage(message: XmtpMessageOutput, workflowTrigger: WorkflowTrigger) {
95+
await this.workflowUsedIdService.createOne({
96+
workflow: workflowTrigger.workflow,
97+
triggerId: message.id,
98+
})
99+
100+
const workflow = await this.workflowService.findOne({ _id: workflowTrigger.workflow })
101+
if (!workflow) {
102+
return
103+
}
104+
105+
this.logger.log(`Processing chatbot message: ${message.id} for workflow: ${workflow._id}`)
106+
107+
const workflowSleeps = await this.workflowSleepService.find({
108+
workflow: workflowTrigger.workflow,
109+
uniqueGroup: message.conversation.id,
110+
})
111+
112+
// continue previous conversation
113+
if (workflowSleeps.length > 0) {
114+
void this.continueConversation(workflow, workflowTrigger, workflowSleeps, message)
115+
return
116+
}
117+
118+
const tags = workflowTrigger.inputs?.tags?.split(',').map((tag) => tag.trim()) ?? []
119+
const contact = await this.contactService.findOne({
120+
owner: workflow.owner,
121+
address: message.senderAddress,
122+
})
123+
if (!contact) {
124+
await this.contactService.createOne({
125+
owner: workflow.owner,
126+
address: message.senderAddress,
127+
tags,
128+
})
129+
} else if (workflowTrigger.inputs?.tags) {
130+
const newTags = uniq([...contact.tags, ...tags])
131+
if (newTags.length !== contact.tags.length) {
132+
await this.contactService.updateById(contact._id, {
133+
tags: contact.tags,
134+
})
135+
}
136+
}
137+
138+
const hookTriggerOutputs = {
139+
id: message.id,
140+
outputs: {
141+
[workflowTrigger.id]: message as Record<string, any>,
142+
trigger: message as Record<string, any>,
143+
contact: {
144+
address: message.senderAddress,
145+
},
146+
},
147+
}
148+
const rootActions = await this.workflowActionService.find({ workflow: workflow._id, isRootAction: true })
149+
const workflowRun = await this.workflowRunService.createOneByInstantTrigger(
150+
this.chatbotIntegration,
151+
this.chatbotIntegrationTrigger,
152+
workflow,
153+
workflowTrigger,
154+
rootActions.length > 0,
155+
)
156+
await this.workflowTriggerService.updateById(workflowTrigger._id, {
157+
lastId: message.id,
158+
lastItem: message,
159+
})
160+
void this.runnerService.runWorkflowActions(rootActions, [hookTriggerOutputs], workflowRun)
161+
}
162+
163+
async continueConversation(
164+
workflow: Workflow,
165+
workflowTrigger: WorkflowTrigger,
166+
workflowSleeps: WorkflowSleep[],
167+
outputs: XmtpMessageOutput,
168+
) {
169+
const workflowSleep = workflowSleeps[0]
170+
171+
// clean up
172+
await this.workflowSleepService.deleteManyNative({
173+
_id: {
174+
$in: workflowSleeps.map((workflowSleep) => workflowSleep._id),
175+
},
176+
})
177+
178+
this.logger.log(`Continuing chatbot conversation ${workflowSleep.id} for workflow ${workflowTrigger.workflow}`)
179+
180+
const workflowAction = await this.workflowActionService.findById(workflowSleep.workflowAction.toString())
181+
const workflowRun = await this.workflowRunService.findById(workflowSleep.workflowRun.toString())
182+
183+
if (!workflowAction || !workflowRun) {
184+
this.logger.error(`Missing workflow action or workflow run for workflow sleep ${workflowSleep.id}`)
185+
await this.workflowRunService.updateById(workflowSleep._id, { status: WorkflowRunStatus.failed })
186+
return
187+
}
188+
189+
await this.workflowRunService.wakeUpWorkflowRun(workflowRun)
190+
const nextActionInputs = {
191+
...(workflowSleep.nextActionInputs ?? {}),
192+
[workflowAction.id]: {
193+
...((workflowSleep.nextActionInputs?.[workflowAction.id] as any) ?? {}),
194+
responseId: outputs.id,
195+
responseContent: outputs.content,
196+
},
197+
} as Record<string, Record<string, unknown>>
198+
const actions = await this.workflowActionService.findByIds(
199+
workflowAction.nextActions.map((next) => next.action) as Types.ObjectId[],
200+
)
201+
const promises = actions.map((action) =>
202+
this.runnerService.runWorkflowActionsTree(workflow, action, nextActionInputs, workflowRun, workflowSleep.itemId),
203+
)
204+
void Promise.all(promises).then(() => {
205+
return this.workflowRunService.markWorkflowRunAsCompleted(workflowRun._id)
206+
})
207+
}
208+
209+
async processXmtpMessage(message: XmtpMessageOutput, workflowTrigger: WorkflowTrigger) {
210+
await this.workflowUsedIdService.createOne({
211+
workflow: workflowTrigger.workflow,
212+
triggerId: message.id,
213+
})
214+
215+
const workflow = await this.workflowService.findOne({ _id: workflowTrigger.workflow })
216+
if (!workflow) {
217+
return
218+
}
219+
220+
this.logger.log(`Processing xmtp message: ${message.id} for workflow: ${workflow._id}`)
221+
222+
const hookTriggerOutputs = {
223+
id: message.id,
224+
outputs: {
225+
[workflowTrigger.id]: message as Record<string, any>,
226+
trigger: message as Record<string, any>,
227+
},
228+
}
229+
230+
const rootActions = await this.workflowActionService.find({ workflow: workflow._id, isRootAction: true })
231+
const workflowRun = await this.workflowRunService.createOneByInstantTrigger(
232+
this.xmtpIntegration,
233+
this.xmtpIntegrationTrigger,
234+
workflow,
235+
workflowTrigger,
236+
rootActions.length > 0,
237+
)
238+
await this.workflowTriggerService.updateById(workflowTrigger._id, {
239+
lastId: message.id,
240+
lastItem: message,
241+
})
242+
void this.runnerService.runWorkflowActions(rootActions, [hookTriggerOutputs], workflowRun)
243+
}
244+
}

apps/api/src/workflow-triggers/workflow-triggers.module.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { DefinitionsModule } from '../../../../libs/definitions/src'
55
import { RunnerModule } from '../../../runner/src/runner.module'
66
import { AccountCredentialsModule } from '../account-credentials/account-credentials.module'
77
import { AuthModule } from '../auth/auth.module'
8+
import { ContactsModule } from '../contacts/contacts.module'
89
import { IntegrationAccountsModule } from '../integration-accounts/integration-accounts.module'
910
import { IntegrationTriggersModule } from '../integration-triggers/integration-triggers.module'
1011
import { IntegrationsModule } from '../integrations/integrations.module'
@@ -13,6 +14,7 @@ import { WorkflowActionsModule } from '../workflow-actions/workflow-actions.modu
1314
import { WorkflowRunsModule } from '../workflow-runs/workflow-runs.module'
1415
import { WorkflowsModule } from '../workflows/workflows.module'
1516
import { ChainJetBotController } from './controllers/chainjetbot.controller'
17+
import { ChatbotController } from './controllers/chatbot.controller'
1618
import { HooksController } from './controllers/hooks.controller'
1719
import { WorkflowTrigger, WorkflowTriggerAuthorizer } from './entities/workflow-trigger'
1820
import { WorkflowUsedId } from './entities/workflow-used-id'
@@ -42,9 +44,10 @@ import { WorkflowUsedIdService } from './services/workflow-used-id.service'
4244

4345
// TODO remove forwardRef once Runner calls are replaced with queues
4446
forwardRef(() => RunnerModule),
47+
ContactsModule,
4548
],
4649
providers: [WorkflowTriggerResolver, WorkflowTriggerService, WorkflowTriggerAuthorizer, WorkflowUsedIdService],
4750
exports: [WorkflowTriggerService, WorkflowUsedIdService],
48-
controllers: [HooksController, ChainJetBotController],
51+
controllers: [HooksController, ChainJetBotController, ChatbotController],
4952
})
5053
export class WorkflowTriggersModule {}

apps/blockchain-listener/src/blockchain-listener.module.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import { WorkflowTriggersModule } from 'apps/api/src/workflow-triggers/workflow-
1717
import { WorkflowsModule } from 'apps/api/src/workflows/workflows.module'
1818
import { RunnerModule } from 'apps/runner/src/runner.module'
1919
import { BlockchainListenerService } from './blockchain-listener.service'
20-
import { ChatbotListenerService } from './chatbot-listener.service'
2120
import { XmtpListenerService } from './xmtp-listener.service'
2221

2322
@Module({
@@ -40,6 +39,6 @@ import { XmtpListenerService } from './xmtp-listener.service'
4039
UserDatabaseModule,
4140
ContactsModule,
4241
],
43-
providers: [BlockchainListenerService, XmtpListenerService, ChatbotListenerService],
42+
providers: [BlockchainListenerService, XmtpListenerService],
4443
})
4544
export class BlockchainListenerModule {}

0 commit comments

Comments
 (0)