Skip to content

Commit a0796f0

Browse files
improvement(mothership): workflow edits via sockets (#3927)
* improvement(mothership): workflow edits via sockets * make embedded view join room * fix cursor positioning bug
1 parent 98fe4cd commit a0796f0

File tree

4 files changed

+126
-73
lines changed

4 files changed

+126
-73
lines changed

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/workflow.tsx

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -265,14 +265,22 @@ const WorkflowContent = React.memo(
265265
const { fitViewToBounds, getViewportCenter } = useCanvasViewport(reactFlowInstance, {
266266
embedded,
267267
})
268-
const { emitCursorUpdate } = useSocket()
268+
const { emitCursorUpdate, joinWorkflow, leaveWorkflow } = useSocket()
269269
useDynamicHandleRefresh()
270270

271271
const workspaceId = propWorkspaceId || (params.workspaceId as string)
272272
const workflowIdParam = propWorkflowId || (params.workflowId as string)
273273

274274
const addNotification = useNotificationStore((state) => state.addNotification)
275275

276+
useEffect(() => {
277+
if (!embedded || !workflowIdParam) return
278+
joinWorkflow(workflowIdParam)
279+
return () => {
280+
leaveWorkflow()
281+
}
282+
}, [embedded, workflowIdParam, joinWorkflow, leaveWorkflow])
283+
276284
useOAuthReturnForWorkflow(workflowIdParam)
277285

278286
const {
@@ -2144,12 +2152,9 @@ const WorkflowContent = React.memo(
21442152

21452153
const handleCanvasPointerMove = useCallback(
21462154
(event: React.PointerEvent<Element>) => {
2147-
const target = event.currentTarget as HTMLElement
2148-
const bounds = target.getBoundingClientRect()
2149-
21502155
const position = screenToFlowPosition({
2151-
x: event.clientX - bounds.left,
2152-
y: event.clientY - bounds.top,
2156+
x: event.clientX,
2157+
y: event.clientY,
21532158
})
21542159

21552160
emitCursorUpdate(position)

apps/sim/app/workspace/providers/socket-provider.tsx

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ interface SocketContextType {
9090
onSelectionUpdate: (handler: (data: any) => void) => void
9191
onWorkflowDeleted: (handler: (data: any) => void) => void
9292
onWorkflowReverted: (handler: (data: any) => void) => void
93+
onWorkflowUpdated: (handler: (data: any) => void) => void
9394
onOperationConfirmed: (handler: (data: any) => void) => void
9495
onOperationFailed: (handler: (data: any) => void) => void
9596
}
@@ -118,6 +119,7 @@ const SocketContext = createContext<SocketContextType>({
118119
onSelectionUpdate: () => {},
119120
onWorkflowDeleted: () => {},
120121
onWorkflowReverted: () => {},
122+
onWorkflowUpdated: () => {},
121123
onOperationConfirmed: () => {},
122124
onOperationFailed: () => {},
123125
})
@@ -155,6 +157,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
155157
selectionUpdate?: (data: any) => void
156158
workflowDeleted?: (data: any) => void
157159
workflowReverted?: (data: any) => void
160+
workflowUpdated?: (data: any) => void
158161
operationConfirmed?: (data: any) => void
159162
operationFailed?: (data: any) => void
160163
}>({})
@@ -334,7 +337,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
334337
socketInstance.on('join-workflow-success', ({ workflowId, presenceUsers }) => {
335338
isRejoiningRef.current = false
336339
// Ignore stale success responses from previous navigation
337-
if (workflowId !== urlWorkflowIdRef.current) {
340+
if (urlWorkflowIdRef.current && workflowId !== urlWorkflowIdRef.current) {
338341
logger.debug(`Ignoring stale join-workflow-success for ${workflowId}`)
339342
return
340343
}
@@ -382,6 +385,11 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
382385
eventHandlers.current.workflowReverted?.(data)
383386
})
384387

388+
socketInstance.on('workflow-updated', (data) => {
389+
logger.info(`Workflow ${data.workflowId} has been updated externally`)
390+
eventHandlers.current.workflowUpdated?.(data)
391+
})
392+
385393
const rehydrateWorkflowStores = async (workflowId: string, workflowState: any) => {
386394
const [
387395
{ useOperationQueueStore },
@@ -803,6 +811,10 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
803811
eventHandlers.current.workflowReverted = handler
804812
}, [])
805813

814+
const onWorkflowUpdated = useCallback((handler: (data: any) => void) => {
815+
eventHandlers.current.workflowUpdated = handler
816+
}, [])
817+
806818
const onOperationConfirmed = useCallback((handler: (data: any) => void) => {
807819
eventHandlers.current.operationConfirmed = handler
808820
}, [])
@@ -836,6 +848,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
836848
onSelectionUpdate,
837849
onWorkflowDeleted,
838850
onWorkflowReverted,
851+
onWorkflowUpdated,
839852
onOperationConfirmed,
840853
onOperationFailed,
841854
}),
@@ -863,6 +876,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
863876
onSelectionUpdate,
864877
onWorkflowDeleted,
865878
onWorkflowReverted,
879+
onWorkflowUpdated,
866880
onOperationConfirmed,
867881
onOperationFailed,
868882
]

apps/sim/hooks/use-collaborative-workflow.ts

Lines changed: 87 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ export function useCollaborativeWorkflow() {
123123
onVariableUpdate,
124124
onWorkflowDeleted,
125125
onWorkflowReverted,
126+
onWorkflowUpdated,
126127
onOperationConfirmed,
127128
onOperationFailed,
128129
} = useSocket()
@@ -537,81 +538,99 @@ export function useCollaborativeWorkflow() {
537538
}
538539
}
539540

541+
const reloadWorkflowFromApi = async (workflowId: string, reason: string): Promise<boolean> => {
542+
const response = await fetch(`/api/workflows/${workflowId}`)
543+
if (!response.ok) {
544+
logger.error(`Failed to fetch workflow data after ${reason}: ${response.statusText}`)
545+
return false
546+
}
547+
548+
const responseData = await response.json()
549+
const workflowData = responseData.data
550+
551+
if (!workflowData?.state) {
552+
logger.error(`No state found in workflow data after ${reason}`, { workflowData })
553+
return false
554+
}
555+
556+
isApplyingRemoteChange.current = true
557+
try {
558+
useWorkflowStore.getState().replaceWorkflowState({
559+
blocks: workflowData.state.blocks || {},
560+
edges: workflowData.state.edges || [],
561+
loops: workflowData.state.loops || {},
562+
parallels: workflowData.state.parallels || {},
563+
lastSaved: workflowData.state.lastSaved || Date.now(),
564+
})
565+
566+
const subblockValues: Record<string, Record<string, any>> = {}
567+
Object.entries(workflowData.state.blocks || {}).forEach(([blockId, block]) => {
568+
const blockState = block as any
569+
subblockValues[blockId] = {}
570+
Object.entries(blockState.subBlocks || {}).forEach(([subblockId, subblock]) => {
571+
subblockValues[blockId][subblockId] = (subblock as any).value
572+
})
573+
})
574+
575+
useSubBlockStore.setState((state: any) => ({
576+
workflowValues: {
577+
...state.workflowValues,
578+
[workflowId]: subblockValues,
579+
},
580+
}))
581+
582+
const graph = {
583+
blocksById: workflowData.state.blocks || {},
584+
edgesById: Object.fromEntries(
585+
(workflowData.state.edges || []).map((e: any) => [e.id, e])
586+
),
587+
}
588+
589+
const undoRedoStore = useUndoRedoStore.getState()
590+
const stackKeys = Object.keys(undoRedoStore.stacks)
591+
stackKeys.forEach((key) => {
592+
const [wfId, userId] = key.split(':')
593+
if (wfId === workflowId) {
594+
undoRedoStore.pruneInvalidEntries(wfId, userId, graph)
595+
}
596+
})
597+
598+
logger.info(`Successfully reloaded workflow state after ${reason}`, { workflowId })
599+
return true
600+
} finally {
601+
isApplyingRemoteChange.current = false
602+
}
603+
}
604+
540605
const handleWorkflowReverted = async (data: any) => {
541606
const { workflowId } = data
542607
logger.info(`Workflow ${workflowId} has been reverted to deployed state`)
543608

544-
// If the reverted workflow is the currently active one, reload the workflow state
545-
if (activeWorkflowId === workflowId) {
546-
logger.info(`Currently active workflow ${workflowId} was reverted, reloading state`)
547-
548-
try {
549-
// Fetch the updated workflow state from the server (which loads from normalized tables)
550-
const response = await fetch(`/api/workflows/${workflowId}`)
551-
if (response.ok) {
552-
const responseData = await response.json()
553-
const workflowData = responseData.data
554-
555-
if (workflowData?.state) {
556-
// Update the workflow store with the reverted state
557-
isApplyingRemoteChange.current = true
558-
try {
559-
// Update the main workflow state using the API response
560-
useWorkflowStore.getState().replaceWorkflowState({
561-
blocks: workflowData.state.blocks || {},
562-
edges: workflowData.state.edges || [],
563-
loops: workflowData.state.loops || {},
564-
parallels: workflowData.state.parallels || {},
565-
lastSaved: workflowData.state.lastSaved || Date.now(),
566-
})
609+
if (activeWorkflowId !== workflowId) return
567610

568-
// Update subblock store with reverted values
569-
const subblockValues: Record<string, Record<string, any>> = {}
570-
Object.entries(workflowData.state.blocks || {}).forEach(([blockId, block]) => {
571-
const blockState = block as any
572-
subblockValues[blockId] = {}
573-
Object.entries(blockState.subBlocks || {}).forEach(([subblockId, subblock]) => {
574-
subblockValues[blockId][subblockId] = (subblock as any).value
575-
})
576-
})
611+
try {
612+
await reloadWorkflowFromApi(workflowId, 'revert')
613+
} catch (error) {
614+
logger.error('Error reloading workflow state after revert:', error)
615+
}
616+
}
577617

578-
// Update subblock store for this workflow
579-
useSubBlockStore.setState((state: any) => ({
580-
workflowValues: {
581-
...state.workflowValues,
582-
[workflowId]: subblockValues,
583-
},
584-
}))
618+
const handleWorkflowUpdated = async (data: any) => {
619+
const { workflowId } = data
620+
logger.info(`Workflow ${workflowId} has been updated externally`)
585621

586-
logger.info(`Successfully loaded reverted workflow state for ${workflowId}`)
622+
if (activeWorkflowId !== workflowId) return
587623

588-
const graph = {
589-
blocksById: workflowData.state.blocks || {},
590-
edgesById: Object.fromEntries(
591-
(workflowData.state.edges || []).map((e: any) => [e.id, e])
592-
),
593-
}
624+
const { hasActiveDiff } = useWorkflowDiffStore.getState()
625+
if (hasActiveDiff) {
626+
logger.info('Skipping workflow-updated: active diff in progress', { workflowId })
627+
return
628+
}
594629

595-
const undoRedoStore = useUndoRedoStore.getState()
596-
const stackKeys = Object.keys(undoRedoStore.stacks)
597-
stackKeys.forEach((key) => {
598-
const [wfId, userId] = key.split(':')
599-
if (wfId === workflowId) {
600-
undoRedoStore.pruneInvalidEntries(wfId, userId, graph)
601-
}
602-
})
603-
} finally {
604-
isApplyingRemoteChange.current = false
605-
}
606-
} else {
607-
logger.error('No state found in workflow data after revert', { workflowData })
608-
}
609-
} else {
610-
logger.error(`Failed to fetch workflow data after revert: ${response.statusText}`)
611-
}
612-
} catch (error) {
613-
logger.error('Error reloading workflow state after revert:', error)
614-
}
630+
try {
631+
await reloadWorkflowFromApi(workflowId, 'external update')
632+
} catch (error) {
633+
logger.error('Error reloading workflow state after external update:', error)
615634
}
616635
}
617636

@@ -633,6 +652,7 @@ export function useCollaborativeWorkflow() {
633652
onVariableUpdate(handleVariableUpdate)
634653
onWorkflowDeleted(handleWorkflowDeleted)
635654
onWorkflowReverted(handleWorkflowReverted)
655+
onWorkflowUpdated(handleWorkflowUpdated)
636656
onOperationConfirmed(handleOperationConfirmed)
637657
onOperationFailed(handleOperationFailed)
638658
}, [
@@ -641,6 +661,7 @@ export function useCollaborativeWorkflow() {
641661
onVariableUpdate,
642662
onWorkflowDeleted,
643663
onWorkflowReverted,
664+
onWorkflowUpdated,
644665
onOperationConfirmed,
645666
onOperationFailed,
646667
activeWorkflowId,

apps/sim/lib/copilot/tools/server/workflow/edit-workflow/index.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
type BaseServerTool,
88
type ServerToolContext,
99
} from '@/lib/copilot/tools/server/base-tool'
10+
import { env } from '@/lib/core/config/env'
1011
import { applyTargetedLayout, getTargetedLayoutImpact } from '@/lib/workflows/autolayout'
1112
import {
1213
DEFAULT_HORIZONTAL_SPACING,
@@ -287,6 +288,18 @@ export const editWorkflowServerTool: BaseServerTool<EditWorkflowParams, unknown>
287288

288289
logger.info('Workflow state persisted to database', { workflowId })
289290

291+
const socketUrl = env.SOCKET_SERVER_URL || 'http://localhost:3002'
292+
fetch(`${socketUrl}/api/workflow-updated`, {
293+
method: 'POST',
294+
headers: {
295+
'Content-Type': 'application/json',
296+
'x-api-key': env.INTERNAL_API_SECRET,
297+
},
298+
body: JSON.stringify({ workflowId }),
299+
}).catch((error) => {
300+
logger.warn('Failed to notify socket server of workflow update', { workflowId, error })
301+
})
302+
290303
const sanitizationWarnings = validation.warnings.length > 0 ? validation.warnings : undefined
291304

292305
return {

0 commit comments

Comments
 (0)