1+ """
2+ title: FlowiseAI Integration
3+ author: Claude
4+ author_url: https://anthropic.com
5+ git_url: https://github.com/open-webui/pipelines/
6+ description: Access FlowiseAI endpoints with customizable flows
7+ required_open_webui_version: 0.4.3
8+ requirements: requests
9+ version: 0.4.3
10+ licence: MIT
11+ """
12+
13+ from typing import List , Union , Generator , Iterator , Dict , Optional
14+ from pydantic import BaseModel , Field
15+ import requests
16+ import os
17+ import re
18+ import json
19+ from datetime import datetime
20+ import time
21+
22+ from logging import getLogger
23+ logger = getLogger (__name__ )
24+ logger .setLevel ("DEBUG" )
25+
26+
27+ class Pipeline :
28+ class Valves (BaseModel ):
29+ API_KEY : str = Field (default = "" , description = "FlowiseAI API key" )
30+ API_URL : str = Field (default = "" , description = "FlowiseAI base URL" )
31+ RATE_LIMIT : int = Field (default = 5 , description = "Rate limit for the pipeline" )
32+
33+ FLOW_0_ENABLED : Optional [bool ] = Field (default = False , description = "Flow 0 Enabled (make this flow available for use)" )
34+ FLOW_0_ID : Optional [str ] = Field (default = None , description = "Flow 0 ID (the FlowiseAI flow identifier)" )
35+ FLOW_0_NAME : Optional [str ] = Field (default = None , description = "Flow 0 Name (human-readable name for the flow)" )
36+
37+ FLOW_1_ENABLED : Optional [bool ] = Field (default = False , description = "Flow 1 Enabled (make this flow available for use)" )
38+ FLOW_1_ID : Optional [str ] = Field (default = None , description = "Flow 1 ID (the FlowiseAI flow identifier)" )
39+ FLOW_1_NAME : Optional [str ] = Field (default = None , description = "Flow 1 Name (human-readable name for the flow)" )
40+
41+ FLOW_2_ENABLED : Optional [bool ] = Field (default = False , description = "Flow 2 Enabled (make this flow available for use)" )
42+ FLOW_2_ID : Optional [str ] = Field (default = None , description = "Flow 2 ID (the FlowiseAI flow identifier)" )
43+ FLOW_2_NAME : Optional [str ] = Field (default = None , description = "Flow 2 Name (human-readable name for the flow)" )
44+
45+ FLOW_3_ENABLED : Optional [bool ] = Field (default = False , description = "Flow 3 Enabled (make this flow available for use)" )
46+ FLOW_3_ID : Optional [str ] = Field (default = None , description = "Flow 3 ID (the FlowiseAI flow identifier)" )
47+ FLOW_3_NAME : Optional [str ] = Field (default = None , description = "Flow 3 Name (human-readable name for the flow)" )
48+
49+ FLOW_4_ENABLED : Optional [bool ] = Field (default = False , description = "Flow 4 Enabled (make this flow available for use)" )
50+ FLOW_4_ID : Optional [str ] = Field (default = None , description = "Flow 4 ID (the FlowiseAI flow identifier)" )
51+ FLOW_4_NAME : Optional [str ] = Field (default = None , description = "Flow 4 Name (human-readable name for the flow)" )
52+
53+ def __init__ (self ):
54+ self .name = "FlowiseAI Pipeline"
55+
56+ # Initialize valve parameters from environment variables
57+ self .valves = self .Valves (
58+ ** {k : os .getenv (k , v .default ) for k , v in self .Valves .model_fields .items ()}
59+ )
60+
61+ # Build flow mapping for faster lookup
62+ self .flows = {}
63+ self .update_flows ()
64+
65+ def update_flows (self ):
66+ """Update the flows dictionary based on the current valve settings"""
67+ self .flows = {}
68+ # Iterate through each flow
69+ for i in range (20 ): # Support up to 20 flows
70+ enabled_name = f"FLOW_{ i } _ENABLED"
71+ if not hasattr (self .valves , enabled_name ): # sequential numbering
72+ break
73+ enabled = getattr (self .valves , f"FLOW_{ i } _ENABLED" , False )
74+ flow_id = getattr (self .valves , f"FLOW_{ i } _ID" , None )
75+ flow_name = getattr (self .valves , f"FLOW_{ i } _NAME" , None )
76+
77+ if enabled and flow_id and flow_name :
78+ self .flows [flow_name .lower ()] = flow_id
79+
80+ logger .info (f"Updated flows: { list (self .flows .keys ())} " )
81+
82+ async def on_startup (self ):
83+ """Called when the server is started"""
84+ logger .debug (f"on_startup:{ self .name } " )
85+ self .update_flows ()
86+
87+ async def on_shutdown (self ):
88+ """Called when the server is stopped"""
89+ logger .debug (f"on_shutdown:{ self .name } " )
90+
91+ async def on_valves_updated (self ) -> None :
92+ """Called when valves are updated"""
93+ logger .debug (f"on_valves_updated:{ self .name } " )
94+ self .update_flows ()
95+
96+ def rate_check (self , dt_start : datetime ) -> bool :
97+ """
98+ Check time, sleep if not enough time has passed for rate
99+
100+ Args:
101+ dt_start (datetime): Start time of the operation
102+ Returns:
103+ bool: True if sleep was done
104+ """
105+ dt_end = datetime .now ()
106+ time_diff = (dt_end - dt_start ).total_seconds ()
107+ time_buffer = (1 / self .valves .RATE_LIMIT )
108+ if time_diff >= time_buffer : # no need to sleep
109+ return False
110+ time .sleep (time_buffer - time_diff )
111+ return True
112+
113+ def parse_user_input (self , user_message : str ) -> tuple [str , str ]:
114+ """
115+ Parse the user message to extract flow name and query
116+
117+ Format expected: @flow_name: query
118+
119+ Args:
120+ user_message (str): User's input message
121+
122+ Returns:
123+ tuple[str, str]: Flow name and query
124+ """
125+ # Match pattern @flow_name: query
126+ pattern = r"^@([^:]+):\s*(.+)$"
127+ match = re .match (pattern , user_message .strip ())
128+
129+ if not match :
130+ return None , user_message
131+
132+ flow_name = match .group (1 ).strip ().lower ()
133+ query = match .group (2 ).strip ()
134+
135+ return flow_name , query
136+
137+ def pipe (
138+ self ,
139+ user_message : str ,
140+ model_id : str ,
141+ messages : List [dict ],
142+ body : dict
143+ ) -> Union [str , Generator , Iterator ]:
144+ """
145+ Main pipeline function. Calls a specified FlowiseAI flow with the provided query.
146+
147+ Format expected: @flow_name: query
148+ If no flow is specified, a list of available flows will be returned.
149+ """
150+ logger .debug (f"pipe:{ self .name } " )
151+
152+ dt_start = datetime .now ()
153+ streaming = body .get ("stream" , False )
154+ logger .warning (f"Stream: { streaming } " )
155+ context = ""
156+
157+ # Check if we have valid API configuration
158+ if not self .valves .API_KEY or not self .valves .API_URL :
159+ error_msg = "FlowiseAI configuration missing. Please set API_KEY and API_URL valves."
160+ if streaming :
161+ yield error_msg
162+ else :
163+ return error_msg
164+
165+ # Parse the user message to extract flow name and query
166+ flow_name , query = self .parse_user_input (user_message )
167+
168+ # If no flow specified or invalid flow, list available flows
169+ if not flow_name or flow_name not in self .flows :
170+ available_flows = list (self .flows .keys ())
171+ if not available_flows :
172+ no_flows_msg = "No flows configured. Enable at least one FLOW_X_ENABLED valve and set its ID and NAME."
173+ if streaming :
174+ yield no_flows_msg
175+ else :
176+ return no_flows_msg
177+
178+ flows_list = "\n " .join ([f"- @{ flow } " for flow in available_flows ])
179+ help_msg = f"Please specify a flow using the format: @flow_name: your query\n \n Available flows:\n { flows_list } "
180+
181+ if not flow_name :
182+ help_msg = "No flow specified. " + help_msg
183+ else :
184+ help_msg = f"Invalid flow '{ flow_name } '. " + help_msg
185+
186+ if streaming :
187+ yield help_msg
188+ else :
189+ return help_msg
190+
191+ # Get the flow ID from the map
192+ flow_id = self .flows [flow_name ]
193+
194+ if streaming :
195+ yield from self .stream_retrieve (flow_id , flow_name , query , dt_start )
196+ else :
197+ for chunk in self .stream_retrieve (flow_id , flow_name , query , dt_start ):
198+ context += chunk
199+ return context if context else "No response from FlowiseAI"
200+
201+ def stream_retrieve (
202+ self , flow_id : str , flow_name : str , query : str , dt_start : datetime
203+ ) -> Generator :
204+ """
205+ Call the FlowiseAI endpoint with the specified flow ID and query.
206+
207+ Args:
208+ flow_id (str): The ID of the flow to call
209+ flow_name (str): The name of the flow (for logging)
210+ query (str): The user's query
211+ dt_start (datetime): Start time for rate limiting
212+
213+ Returns:
214+ Generator: Response chunks for streaming
215+ """
216+ if not query :
217+ yield "Query is empty. Please provide a question or prompt for the flow."
218+ return
219+
220+ api_url = f"{ self .valves .API_URL .rstrip ('/' )} /api/v1/prediction/{ flow_id } "
221+ headers = {"Authorization" : f"Bearer { self .valves .API_KEY } " }
222+
223+ payload = {
224+ "question" : query ,
225+ }
226+
227+ try :
228+ logger .info (f"Calling FlowiseAI flow '{ flow_name } ' with query: { query } " )
229+
230+ # Rate limiting check
231+ self .rate_check (dt_start )
232+
233+ response = requests .post (api_url , headers = headers , json = payload )
234+
235+ if response .status_code != 200 :
236+ error_msg = f"Error from FlowiseAI: Status { response .status_code } "
237+ logger .error (f"{ error_msg } - { response .text } " )
238+ yield error_msg
239+ return
240+
241+ try :
242+ result = response .json ()
243+
244+ # Format might vary based on flow configuration
245+ # Try common response formats
246+ if isinstance (result , dict ):
247+ if "text" in result :
248+ yield result ["text" ]
249+ elif "answer" in result :
250+ yield result ["answer" ]
251+ elif "response" in result :
252+ yield result ["response" ]
253+ elif "result" in result :
254+ yield result ["result" ]
255+ else :
256+ # If no standard field found, return full JSON as string
257+ yield f"```json\n { json .dumps (result , indent = 2 )} \n ```"
258+ elif isinstance (result , str ):
259+ yield result
260+ else :
261+ yield f"```json\n { json .dumps (result , indent = 2 )} \n ```"
262+
263+ except json .JSONDecodeError :
264+ # If not JSON, return the raw text
265+ yield response .text
266+
267+ except Exception as e :
268+ error_msg = f"Error calling FlowiseAI: { str (e )} "
269+ logger .error (error_msg )
270+ yield error_msg
271+
272+ return
0 commit comments