Skip to content

Commit cb5a16a

Browse files
committed
fix(flowise): add streaming capability to flowise integration
1 parent 5e1f90d commit cb5a16a

File tree

1 file changed

+146
-26
lines changed

1 file changed

+146
-26
lines changed

examples/pipelines/integrations/flowise_pipeline.py

Lines changed: 146 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
git_url: https://github.com/open-webui/pipelines/
66
description: Access FlowiseAI endpoints with customizable flows
77
required_open_webui_version: 0.4.3
8-
requirements: requests
8+
requirements: requests,flowise>=1.0.4
99
version: 0.4.3
1010
licence: MIT
1111
"""
@@ -18,6 +18,7 @@
1818
import json
1919
from datetime import datetime
2020
import time
21+
from flowise import Flowise, PredictionData
2122

2223
from logging import getLogger
2324
logger = getLogger(__name__)
@@ -26,29 +27,51 @@
2627

2728
class Pipeline:
2829
class Valves(BaseModel):
29-
API_KEY: str = Field(default="", description="FlowiseAI API key")
30-
API_URL: str = Field(default="", description="FlowiseAI base URL")
31-
RATE_LIMIT: int = Field(default=5, description="Rate limit for the pipeline")
30+
FLOWISE_API_KEY: str = Field(default="", description="FlowiseAI API key (from Bearer key, e.g. QMknVTFTB40Pk23n6KIVRgdB7va2o-Xlx73zEfpeOu0)")
31+
FLOWISE_BASE_URL: str = Field(default="", description="FlowiseAI base URL (e.g. http://localhost:3000 (URL before '/api/v1/prediction'))")
32+
RATE_LIMIT: int = Field(default=5, description="Rate limit for the pipeline (ops/minute)")
3233

3334
FLOW_0_ENABLED: Optional[bool] = Field(default=False, description="Flow 0 Enabled (make this flow available for use)")
34-
FLOW_0_ID: Optional[str] = Field(default=None, description="Flow 0 ID (the FlowiseAI flow identifier)")
35-
FLOW_0_NAME: Optional[str] = Field(default=None, description="Flow 0 Name (human-readable name for the flow)")
35+
FLOW_0_ID: Optional[str] = Field(default=None, description="Flow 0 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)")
36+
FLOW_0_NAME: Optional[str] = Field(default=None, description="Flow 0 Name (human-readable flow name, no special characters, e.g. news or stock-reader)")
3637

3738
FLOW_1_ENABLED: Optional[bool] = Field(default=False, description="Flow 1 Enabled (make this flow available for use)")
38-
FLOW_1_ID: Optional[str] = Field(default=None, description="Flow 1 ID (the FlowiseAI flow identifier)")
39-
FLOW_1_NAME: Optional[str] = Field(default=None, description="Flow 1 Name (human-readable name for the flow)")
39+
FLOW_1_ID: Optional[str] = Field(default=None, description="Flow 1 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)")
40+
FLOW_1_NAME: Optional[str] = Field(default=None, description="Flow 1 Name (human-readable flwo name, no special characters, e.g. news or stock-reader)")
4041

4142
FLOW_2_ENABLED: Optional[bool] = Field(default=False, description="Flow 2 Enabled (make this flow available for use)")
42-
FLOW_2_ID: Optional[str] = Field(default=None, description="Flow 2 ID (the FlowiseAI flow identifier)")
43-
FLOW_2_NAME: Optional[str] = Field(default=None, description="Flow 2 Name (human-readable name for the flow)")
43+
FLOW_2_ID: Optional[str] = Field(default=None, description="Flow 2 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)")
44+
FLOW_2_NAME: Optional[str] = Field(default=None, description="Flow 2 Name (human-readable flow name, no special characters, e.g. news or stock-reader)")
4445

4546
FLOW_3_ENABLED: Optional[bool] = Field(default=False, description="Flow 3 Enabled (make this flow available for use)")
46-
FLOW_3_ID: Optional[str] = Field(default=None, description="Flow 3 ID (the FlowiseAI flow identifier)")
47-
FLOW_3_NAME: Optional[str] = Field(default=None, description="Flow 3 Name (human-readable name for the flow)")
47+
FLOW_3_ID: Optional[str] = Field(default=None, description="Flow 3 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)")
48+
FLOW_3_NAME: Optional[str] = Field(default=None, description="Flow 3 Name (human-readable flow name, no special characters, e.g. news or stock-reader)")
4849

4950
FLOW_4_ENABLED: Optional[bool] = Field(default=False, description="Flow 4 Enabled (make this flow available for use)")
50-
FLOW_4_ID: Optional[str] = Field(default=None, description="Flow 4 ID (the FlowiseAI flow identifier)")
51-
FLOW_4_NAME: Optional[str] = Field(default=None, description="Flow 4 Name (human-readable name for the flow)")
51+
FLOW_4_ID: Optional[str] = Field(default=None, description="Flow 4 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)")
52+
FLOW_4_NAME: Optional[str] = Field(default=None, description="Flow 4 Name (human-readable flow name, no special characters, e.g. news or stock-reader)")
53+
54+
FLOW_5_ENABLED: Optional[bool] = Field(default=False, description="Flow 5 Enabled (make this flow available for use)")
55+
FLOW_5_ID: Optional[str] = Field(default=None, description="Flow 5 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)")
56+
FLOW_5_NAME: Optional[str] = Field(default=None, description="Flow 5 Name (human-readable flow name, no special characters, e.g. news or stock-reader)")
57+
58+
FLOW_6_ENABLED: Optional[bool] = Field(default=False, description="Flow 6 Enabled (make this flow available for use)")
59+
FLOW_6_ID: Optional[str] = Field(default=None, description="Flow 6 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)")
60+
FLOW_6_NAME: Optional[str] = Field(default=None, description="Flow 6 Name (human-readable flow name, no special characters, e.g. news or stock-reader)")
61+
62+
FLOW_7_ENABLED: Optional[bool] = Field(default=False, description="Flow 7 Enabled (make this flow available for use)")
63+
FLOW_7_ID: Optional[str] = Field(default=None, description="Flow 7 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)")
64+
FLOW_7_NAME: Optional[str] = Field(default=None, description="Flow 7 Name (human-readable flow name, no special characters, e.g. news or stock-reader)")
65+
66+
FLOW_8_ENABLED: Optional[bool] = Field(default=False, description="Flow 8 Enabled (make this flow available for use)")
67+
FLOW_8_ID: Optional[str] = Field(default=None, description="Flow 8 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)")
68+
FLOW_8_NAME: Optional[str] = Field(default=None, description="Flow 8 Name (human-readable flow name, no special characters, e.g. news or stock-reader)")
69+
70+
FLOW_9_ENABLED: Optional[bool] = Field(default=False, description="Flow 9 Enabled (make this flow available for use)")
71+
FLOW_9_ID: Optional[str] = Field(default=None, description="Flow 9 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)")
72+
FLOW_9_NAME: Optional[str] = Field(default=None, description="Flow 9 Name (human-readable flow name, no special characters, e.g. news or stock-reader)")
73+
74+
5275

5376
def __init__(self):
5477
self.name = "FlowiseAI Pipeline"
@@ -122,15 +145,19 @@ def parse_user_input(self, user_message: str) -> tuple[str, str]:
122145
Returns:
123146
tuple[str, str]: Flow name and query
124147
"""
125-
# Match pattern @flow_name: query
126-
pattern = r"^@([^:]+):\s*(.+)$"
148+
# Match pattern flow_name: query
149+
pattern = r"^([^:]+):\s*(.+)$"
127150
match = re.match(pattern, user_message.strip())
128151

129152
if not match:
130153
return None, user_message
131154

132155
flow_name = match.group(1).strip().lower()
133156
query = match.group(2).strip()
157+
158+
date_now = datetime.now().strftime("%Y-%m-%d")
159+
time_now = datetime.now().strftime("%H:%M:%S")
160+
query = f"{query}; today's date is {date_now} and the current time is {time_now}"
134161

135162
return flow_name, query
136163

@@ -155,8 +182,8 @@ def pipe(
155182
context = ""
156183

157184
# Check if we have valid API configuration
158-
if not self.valves.API_KEY or not self.valves.API_URL:
159-
error_msg = "FlowiseAI configuration missing. Please set API_KEY and API_URL valves."
185+
if not self.valves.FLOWISE_API_KEY or not self.valves.FLOWISE_BASE_URL:
186+
error_msg = "FlowiseAI configuration missing. Please set FLOWISE_API_KEY and FLOWISE_BASE_URL valves."
160187
if streaming:
161188
yield error_msg
162189
else:
@@ -166,7 +193,7 @@ def pipe(
166193
flow_name, query = self.parse_user_input(user_message)
167194

168195
# If no flow specified or invalid flow, list available flows
169-
if not flow_name or flow_name not in self.flows:
196+
if flow_name is None or flow_name not in self.flows:
170197
available_flows = list(self.flows.keys())
171198
if not available_flows:
172199
no_flows_msg = "No flows configured. Enable at least one FLOW_X_ENABLED valve and set its ID and NAME."
@@ -175,16 +202,17 @@ def pipe(
175202
else:
176203
return no_flows_msg
177204

178-
flows_list = "\n".join([f"- @{flow}" for flow in available_flows])
179-
help_msg = f"Please specify a flow using the format: @flow_name: your query\n\nAvailable flows:\n{flows_list}"
205+
flows_list = "\n".join([f"- {flow}" for flow in available_flows])
206+
help_msg = f"Please specify a flow using the format: flow_name: your query\n\nAvailable flows:\n{flows_list}"
180207

181-
if not flow_name:
208+
if flow_name is None:
182209
help_msg = "No flow specified. " + help_msg
183210
else:
184211
help_msg = f"Invalid flow '{flow_name}'. " + help_msg
185212

186213
if streaming:
187214
yield help_msg
215+
return
188216
else:
189217
return help_msg
190218

@@ -194,15 +222,15 @@ def pipe(
194222
if streaming:
195223
yield from self.stream_retrieve(flow_id, flow_name, query, dt_start)
196224
else:
197-
for chunk in self.stream_retrieve(flow_id, flow_name, query, dt_start):
225+
for chunk in self.static_retrieve(flow_id, flow_name, query, dt_start):
198226
context += chunk
199227
return context if context else "No response from FlowiseAI"
200228

201229
def stream_retrieve(
202230
self, flow_id: str, flow_name: str, query: str, dt_start: datetime
203231
) -> Generator:
204232
"""
205-
Call the FlowiseAI endpoint with the specified flow ID and query.
233+
Stream responses from FlowiseAI using the official client library.
206234
207235
Args:
208236
flow_id (str): The ID of the flow to call
@@ -216,9 +244,101 @@ def stream_retrieve(
216244
if not query:
217245
yield "Query is empty. Please provide a question or prompt for the flow."
218246
return
247+
248+
try:
249+
logger.info(f"Streaming from FlowiseAI flow '{flow_name}' with query: {query}")
250+
251+
# Rate limiting check
252+
self.rate_check(dt_start)
253+
254+
# Initialize Flowise client with API configuration
255+
client = Flowise(
256+
base_url=self.valves.FLOWISE_BASE_URL.rstrip('/'),
257+
api_key=self.valves.FLOWISE_API_KEY
258+
)
259+
260+
# Create streaming prediction request
261+
completion = client.create_prediction(
262+
PredictionData(
263+
chatflowId=flow_id,
264+
question=query,
265+
streaming=True
266+
)
267+
)
268+
269+
except Exception as e:
270+
error_msg = f"Error streaming from FlowiseAI: {str(e)}"
271+
logger.error(error_msg)
272+
yield error_msg
273+
274+
idx_last_update = 0
275+
yield f"Analysis started... {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
276+
277+
# Process each streamed chunk
278+
for chunk in completion:
279+
try:
280+
if isinstance(chunk, str):
281+
chunk = json.loads(chunk)
282+
except Exception as e:
283+
# If chunk is not a string, it's already a dictionary
284+
pass
285+
286+
try:
287+
if isinstance(chunk, dict):
288+
# Expected format: {event: "token", data: "content"}
289+
if "event" in chunk:
290+
if ((chunk["event"] in ["start", "update", "agentReasoning"]) and
291+
("data" in chunk) and (isinstance(chunk["data"], list))):
292+
for data_update in chunk["data"][idx_last_update:]:
293+
# e.g. {"event":"start","data":[{"agentName":"Perspective Explorer","messages":["...
294+
idx_last_update += 1
295+
yield "\n---\n"
296+
yield f"\n__Reasoning: {data_update['agentName']} ({datetime.now().strftime('%Y-%m-%d %H:%M:%S')})__\n\n"
297+
for message in data_update["messages"]:
298+
yield message # yield message for each agent update
299+
elif chunk["event"] == "end":
300+
# {"event":"end","data":"[DONE]"}
301+
yield "\n---\n"
302+
yield f"\nAnalysis complete. ({datetime.now().strftime('%Y-%m-%d %H:%M:%S')})\n\n"
303+
elif chunk["event"] == "token":
304+
# do nothing, this is the flat output of the flow (final)
305+
pass
306+
elif "error" in chunk:
307+
error_msg = f"Error from FlowiseAI: {chunk['error']}"
308+
logger.error(error_msg)
309+
yield error_msg
310+
else:
311+
# If chunk format is unexpected, yield as is
312+
yield str(chunk)
313+
except Exception as e:
314+
logger.error(f"Error processing chunk: {str(e)}")
315+
yield f"\nUnusual Response Chunk: ({datetime.now().strftime('%Y-%m-%d %H:%M:%S')})\n{str(e)}\n"
316+
yield f"\n---\n"
317+
yield str(chunk)
318+
319+
return
320+
321+
def static_retrieve(
322+
self, flow_id: str, flow_name: str, query: str, dt_start: datetime
323+
) -> Generator:
324+
"""
325+
Call the FlowiseAI endpoint with the specified flow ID and query using REST API.
326+
327+
Args:
328+
flow_id (str): The ID of the flow to call
329+
flow_name (str): The name of the flow (for logging)
330+
query (str): The user's query
331+
dt_start (datetime): Start time for rate limiting
332+
333+
Returns:
334+
Generator: Response chunks for non-streaming requests
335+
"""
336+
if not query:
337+
yield "Query is empty. Please provide a question or prompt for the flow."
338+
return
219339

220-
api_url = f"{self.valves.API_URL.rstrip('/')}/api/v1/prediction/{flow_id}"
221-
headers = {"Authorization": f"Bearer {self.valves.API_KEY}"}
340+
api_url = f"{self.valves.FLOWISE_BASE_URL.rstrip('/')}/api/v1/prediction/{flow_id}"
341+
headers = {"Authorization": f"Bearer {self.valves.FLOWISE_API_KEY}"}
222342

223343
payload = {
224344
"question": query,

0 commit comments

Comments
 (0)