1+ """
2+ title: AWS Bedrock DeepSeek Pipeline
3+ author: kikumoto
4+ date: 2025-03-17
5+ version: 1.0
6+ license: MIT
7+ description: A pipeline for generating text using the AWS Bedrock API.
8+ requirements: boto3
9+ environment_variables:
10+ """
11+
12+ import json
13+ import logging
14+
15+ from typing import List , Union , Generator , Iterator , Dict , Optional , Any
16+
17+ import boto3
18+
19+ from pydantic import BaseModel
20+
21+ import os
22+
23+ from utils .pipelines .main import pop_system_message
24+
25+ class Pipeline :
26+ class Valves (BaseModel ):
27+ AWS_ACCESS_KEY : Optional [str ] = None
28+ AWS_SECRET_KEY : Optional [str ] = None
29+ AWS_REGION_NAME : Optional [str ] = None
30+
31+ def __init__ (self ):
32+ self .type = "manifold"
33+ self .name = "Bedrock DeepSeek: "
34+
35+ self .valves = self .Valves (
36+ ** {
37+ "AWS_ACCESS_KEY" : os .getenv ("AWS_ACCESS_KEY" , "" ),
38+ "AWS_SECRET_KEY" : os .getenv ("AWS_SECRET_KEY" , "" ),
39+ "AWS_REGION_NAME" : os .getenv (
40+ "AWS_REGION_NAME" , os .getenv (
41+ "AWS_REGION" , os .getenv ("AWS_DEFAULT_REGION" , "" )
42+ )
43+ ),
44+ }
45+ )
46+
47+ self .update_pipelines ()
48+
49+ async def on_startup (self ):
50+ # This function is called when the server is started.
51+ print (f"on_startup:{ __name__ } " )
52+ self .update_pipelines ()
53+ pass
54+
55+ async def on_shutdown (self ):
56+ # This function is called when the server is stopped.
57+ print (f"on_shutdown:{ __name__ } " )
58+ pass
59+
60+ async def on_valves_updated (self ):
61+ # This function is called when the valves are updated.
62+ print (f"on_valves_updated:{ __name__ } " )
63+ self .update_pipelines ()
64+
65+ def update_pipelines (self ) -> None :
66+ try :
67+ self .bedrock = boto3 .client (service_name = "bedrock" ,
68+ aws_access_key_id = self .valves .AWS_ACCESS_KEY ,
69+ aws_secret_access_key = self .valves .AWS_SECRET_KEY ,
70+ region_name = self .valves .AWS_REGION_NAME )
71+ self .bedrock_runtime = boto3 .client (service_name = "bedrock-runtime" ,
72+ aws_access_key_id = self .valves .AWS_ACCESS_KEY ,
73+ aws_secret_access_key = self .valves .AWS_SECRET_KEY ,
74+ region_name = self .valves .AWS_REGION_NAME )
75+ self .pipelines = self .get_models ()
76+ except Exception as e :
77+ print (f"Error: { e } " )
78+ self .pipelines = [
79+ {
80+ "id" : "error" ,
81+ "name" : "Could not fetch models from Bedrock, please set up AWS Key/Secret or Instance/Task Role." ,
82+ },
83+ ]
84+
85+ def pipelines (self ) -> List [dict ]:
86+ return self .get_models ()
87+
88+ def get_models (self ):
89+ try :
90+ res = []
91+ response = self .bedrock .list_foundation_models (byProvider = 'DeepSeek' )
92+ for model in response ['modelSummaries' ]:
93+ inference_types = model .get ('inferenceTypesSupported' , [])
94+ if "ON_DEMAND" in inference_types :
95+ res .append ({'id' : model ['modelId' ], 'name' : model ['modelName' ]})
96+ elif "INFERENCE_PROFILE" in inference_types :
97+ inferenceProfileId = self .getInferenceProfileId (model ['modelArn' ])
98+ if inferenceProfileId :
99+ res .append ({'id' : inferenceProfileId , 'name' : model ['modelName' ]})
100+
101+ return res
102+ except Exception as e :
103+ print (f"Error: { e } " )
104+ return [
105+ {
106+ "id" : "error" ,
107+ "name" : "Could not fetch models from Bedrock, please check permissoin." ,
108+ },
109+ ]
110+
111+ def getInferenceProfileId (self , modelArn : str ) -> str :
112+ response = self .bedrock .list_inference_profiles ()
113+ for profile in response .get ('inferenceProfileSummaries' , []):
114+ for model in profile .get ('models' , []):
115+ if model .get ('modelArn' ) == modelArn :
116+ return profile ['inferenceProfileId' ]
117+ return None
118+
119+ def pipe (
120+ self , user_message : str , model_id : str , messages : List [dict ], body : dict
121+ ) -> Union [str , Generator , Iterator ]:
122+ # This is where you can add your custom pipelines like RAG.
123+ print (f"pipe:{ __name__ } " )
124+
125+ try :
126+ # Remove unnecessary keys
127+ for key in ['user' , 'chat_id' , 'title' ]:
128+ body .pop (key , None )
129+
130+ system_message , messages = pop_system_message (messages )
131+
132+ logging .info (f"pop_system_message: { json .dumps (messages )} " )
133+
134+ processed_messages = []
135+ for message in messages :
136+ processed_content = []
137+ if isinstance (message .get ("content" ), list ):
138+ for item in message ["content" ]:
139+ # DeepSeek currently doesn't support multi-modal inputs
140+ if item ["type" ] == "text" :
141+ processed_content .append ({"text" : item ["text" ]})
142+ else :
143+ processed_content = [{"text" : message .get ("content" , "" )}]
144+
145+ processed_messages .append ({"role" : message ["role" ], "content" : processed_content })
146+
147+ payload = {"modelId" : model_id ,
148+ "system" : [{'text' : system_message ["content" ] if system_message else 'you are an intelligent ai assistant' }],
149+ "messages" : processed_messages ,
150+ "inferenceConfig" : {
151+ "temperature" : body .get ("temperature" , 0.5 ),
152+ "topP" : body .get ("top_p" , 0.9 ),
153+ "maxTokens" : body .get ("max_tokens" , 8192 ),
154+ "stopSequences" : body .get ("stop" , []),
155+ },
156+ }
157+
158+ if body .get ("stream" , False ):
159+ return self .stream_response (model_id , payload )
160+ else :
161+ return self .get_completion (model_id , payload )
162+
163+ except Exception as e :
164+ return f"Error: { e } "
165+
166+ def stream_response (self , model_id : str , payload : dict ) -> Generator :
167+ streaming_response = self .bedrock_runtime .converse_stream (** payload )
168+
169+ in_resasoning_context = False
170+ for chunk in streaming_response ["stream" ]:
171+ if in_resasoning_context and "contentBlockStop" in chunk :
172+ in_resasoning_context = False
173+ yield "\n </think> \n \n "
174+ elif "contentBlockDelta" in chunk and "delta" in chunk ["contentBlockDelta" ]:
175+ if "reasoningContent" in chunk ["contentBlockDelta" ]["delta" ]:
176+ if not in_resasoning_context :
177+ yield "<think>"
178+
179+ in_resasoning_context = True
180+ if "text" in chunk ["contentBlockDelta" ]["delta" ]["reasoningContent" ]:
181+ yield chunk ["contentBlockDelta" ]["delta" ]["reasoningContent" ]["text" ]
182+ elif "text" in chunk ["contentBlockDelta" ]["delta" ]:
183+ yield chunk ["contentBlockDelta" ]["delta" ]["text" ]
184+
185+ def get_completion (self , model_id : str , payload : dict ) -> str :
186+ response = self .bedrock_runtime .converse (** payload )
187+ return response ['output' ]['message' ]['content' ][0 ]['text' ]
0 commit comments