Skip to content

Commit 99cfc6b

Browse files
authored
Merge pull request #467 from bartonzzx/langgraph-integration
Add langgraph integration example, also support thinking.
2 parents 6c6e3bd + 48ddbec commit 99cfc6b

File tree

4 files changed

+297
-0
lines changed

4 files changed

+297
-0
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Example of langgraph integration
2+
## Python version: 3.11
3+
## Feature
4+
1. Using langgraph stream writer and custom mode of stream to integrate langgraph with open webui pipeline.
5+
2. Support \<think\> block display.
6+
## Prerequirement
7+
Install the open webui pipeline.
8+
You can follow the docs : https://docs.openwebui.com/pipelines/#-quick-start-with-docker
9+
10+
## Usage
11+
### 1. Upload pipeline file
12+
Upload `langgraph_stream_pipeline.py` to the open webui pipeline.
13+
14+
### 2. Enable the uploaded pipeline
15+
Properly set up your langgraph api url.
16+
17+
And choose **"LangGraph stream"** as your model.
18+
19+
### 2. Install dependencies
20+
Under the folder `pipelines/examples/pipelines/integrations/langgraph_pipeline`, run command below :
21+
```
22+
pip install -r requirements.txt
23+
```
24+
### 3. Start langgraph api server
25+
Run command below :
26+
```
27+
uvicorn langgraph_example:app --reload
28+
```
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
"""
2+
title: Langgraph stream integration
3+
author: bartonzzx
4+
author_url: https://github.com/bartonzzx
5+
git_url:
6+
description: Integrate langgraph with open webui pipeline
7+
required_open_webui_version: 0.4.3
8+
requirements: none
9+
version: 0.4.3
10+
licence: MIT
11+
"""
12+
13+
14+
import os
15+
import json
16+
import getpass
17+
from typing import Annotated, Literal
18+
from typing_extensions import TypedDict
19+
20+
from fastapi import FastAPI
21+
from fastapi.responses import StreamingResponse
22+
23+
from langgraph.graph import StateGraph, START, END
24+
from langgraph.graph.message import add_messages
25+
from langchain_openai import ChatOpenAI
26+
from langgraph.config import get_stream_writer
27+
28+
29+
'''
30+
Define LLM API key
31+
'''
32+
def _set_env(var: str):
33+
if not os.environ.get(var):
34+
os.environ[var] = getpass.getpass(f"{var}: ")
35+
36+
37+
_set_env("OPENAI_API_KEY")
38+
39+
40+
'''
41+
Define Langgraph
42+
'''
43+
def generate_custom_stream(type: Literal["think","normal"], content: str):
44+
content = "\n"+content+"\n"
45+
custom_stream_writer = get_stream_writer()
46+
return custom_stream_writer({type:content})
47+
48+
class State(TypedDict):
49+
messages: Annotated[list, add_messages]
50+
51+
llm = ChatOpenAI(model="gpt-3.5-turbo")
52+
53+
def chatbot(state: State):
54+
think_response = llm.invoke(["Please reasoning:"] + state["messages"])
55+
normal_response = llm.invoke(state["messages"])
56+
generate_custom_stream("think", think_response.content)
57+
generate_custom_stream("normal", normal_response.content)
58+
return {"messages": [normal_response]}
59+
60+
# Define graph
61+
graph_builder = StateGraph(State)
62+
63+
# Define nodes
64+
graph_builder.add_node("chatbot", chatbot)
65+
graph_builder.add_edge("chatbot", END)
66+
67+
# Define edges
68+
graph_builder.add_edge(START, "chatbot")
69+
70+
# Compile graph
71+
graph = graph_builder.compile()
72+
73+
74+
'''
75+
Define api processing
76+
'''
77+
app = FastAPI(
78+
title="Langgraph API",
79+
description="Langgraph API",
80+
)
81+
82+
@app.get("/test")
83+
async def test():
84+
return {"message": "Hello World"}
85+
86+
87+
@app.post("/stream")
88+
async def stream(inputs: State):
89+
async def event_stream():
90+
try:
91+
stream_start_msg = {
92+
'choices':
93+
[
94+
{
95+
'delta': {},
96+
'finish_reason': None
97+
}
98+
]
99+
}
100+
101+
# Stream start
102+
yield f"data: {json.dumps(stream_start_msg)}\n\n"
103+
104+
# Processing langgraph stream response with <think> block support
105+
async for event in graph.astream(input=inputs, stream_mode="custom"):
106+
print(event)
107+
think_content = event.get("think", None)
108+
normal_content = event.get("normal", None)
109+
110+
think_msg = {
111+
'choices':
112+
[
113+
{
114+
'delta':
115+
{
116+
'reasoning_content': think_content,
117+
},
118+
'finish_reason': None
119+
}
120+
]
121+
}
122+
123+
normal_msg = {
124+
'choices':
125+
[
126+
{
127+
'delta':
128+
{
129+
'content': normal_content,
130+
},
131+
'finish_reason': None
132+
}
133+
]
134+
}
135+
136+
yield f"data: {json.dumps(think_msg)}\n\n"
137+
yield f"data: {json.dumps(normal_msg)}\n\n"
138+
139+
# End of the stream
140+
stream_end_msg = {
141+
'choices': [
142+
{
143+
'delta': {},
144+
'finish_reason': 'stop'
145+
}
146+
]
147+
}
148+
yield f"data: {json.dumps(stream_end_msg)}\n\n"
149+
150+
except Exception as e:
151+
# Simply print the error information
152+
print(f"An error occurred: {e}")
153+
154+
return StreamingResponse(
155+
event_stream(),
156+
media_type="text/event-stream",
157+
headers={
158+
"Cache-Control": "no-cache",
159+
"Connection": "keep-alive",
160+
}
161+
)
162+
163+
if __name__ == "__main__":
164+
import uvicorn
165+
166+
uvicorn.run(app, host="0.0.0.0", port=9000)
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
"""
2+
title: Langgraph stream integration
3+
author: bartonzzx
4+
author_url: https://github.com/bartonzzx
5+
git_url:
6+
description: Integrate langgraph with open webui pipeline
7+
required_open_webui_version: 0.4.3
8+
requirements: none
9+
version: 0.4.3
10+
licence: MIT
11+
"""
12+
13+
14+
import os
15+
import requests
16+
from pydantic import BaseModel, Field
17+
from typing import List, Union, Generator, Iterator
18+
19+
20+
class Pipeline:
21+
class Valves(BaseModel):
22+
API_URL: str = Field(default="http://127.0.0.1:9000/stream", description="Langgraph API URL")
23+
24+
def __init__(self):
25+
self.id = "LangGraph stream"
26+
self.name = "LangGraph stream"
27+
# Initialize valve paramaters
28+
self.valves = self.Valves(
29+
**{k: os.getenv(k, v.default) for k, v in self.Valves.model_fields.items()}
30+
)
31+
32+
async def on_startup(self):
33+
# This function is called when the server is started.
34+
print(f"on_startup: {__name__}")
35+
pass
36+
37+
async def on_shutdown(self):
38+
# This function is called when the server is shutdown.
39+
print(f"on_shutdown: {__name__}")
40+
pass
41+
42+
def pipe(
43+
self,
44+
user_message: str,
45+
model_id: str,
46+
messages: List[dict],
47+
body: dict
48+
) -> Union[str, Generator, Iterator]:
49+
50+
data = {
51+
"messages": [[msg['role'], msg['content']] for msg in messages],
52+
}
53+
54+
headers = {
55+
'accept': 'text/event-stream',
56+
'Content-Type': 'application/json',
57+
}
58+
59+
response = requests.post(self.valves.API_URL, json=data, headers=headers, stream=True)
60+
61+
response.raise_for_status()
62+
63+
return response.iter_lines()
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
annotated-types==0.7.0
2+
anyio==4.8.0
3+
certifi==2025.1.31
4+
charset-normalizer==3.4.1
5+
click==8.1.8
6+
distro==1.9.0
7+
fastapi==0.115.11
8+
h11==0.14.0
9+
httpcore==1.0.7
10+
httpx==0.28.1
11+
idna==3.10
12+
jiter==0.9.0
13+
jsonpatch==1.33
14+
jsonpointer==3.0.0
15+
langchain-core==0.3.45
16+
langchain-openai==0.3.8
17+
langgraph==0.3.11
18+
langgraph-checkpoint==2.0.20
19+
langgraph-prebuilt==0.1.3
20+
langgraph-sdk==0.1.57
21+
langsmith==0.3.15
22+
msgpack==1.1.0
23+
openai==1.66.3
24+
orjson==3.10.15
25+
packaging==24.2
26+
pydantic==2.10.6
27+
pydantic_core==2.27.2
28+
PyYAML==6.0.2
29+
regex==2024.11.6
30+
requests==2.32.3
31+
requests-toolbelt==1.0.0
32+
sniffio==1.3.1
33+
starlette==0.46.1
34+
tenacity==9.0.0
35+
tiktoken==0.9.0
36+
tqdm==4.67.1
37+
typing_extensions==4.12.2
38+
urllib3==2.3.0
39+
uvicorn==0.34.0
40+
zstandard==0.23.0

0 commit comments

Comments
 (0)