diff --git a/service/banking/core_banking_routes_v2.py b/service/banking/core_banking_routes_v2.py new file mode 100644 index 0000000..bf11107 --- /dev/null +++ b/service/banking/core_banking_routes_v2.py @@ -0,0 +1,445 @@ +from fastapi import APIRouter, Depends, HTTPException, Response, status +from sqlalchemy.orm import Session +from sqlalchemy import desc +from datetime import datetime, timedelta +from .database import get_db +from .models import Customer, Account, Transaction, Beneficiary +from pydantic import BaseModel +from typing import Optional, List +from fastapi.responses import JSONResponse + +router = APIRouter(prefix="/bank/me/v2", tags=["banking"]) + +def normalize_text(text: str) -> str: + """Normalize text for beneficiary matching by converting to lowercase, removing spaces, + and replacing textual numbers with digits.""" + if not text: + return "" + + # Convert to lowercase + normalized = text.lower() + + # Remove spaces + normalized = normalized.replace(" ", "") + + # Replace textual numbers with digits + number_replacements = { + "zero": "0", "one": "1", "two": "2", "three": "3", "four": "4", + "five": "5", "six": "6", "seven": "7", "eight": "8", "nine": "9", + "ten": "10" + } + + for word, digit in number_replacements.items(): + normalized = normalized.replace(word, digit) + + return normalized + +class PaymentRequest(BaseModel): + to: str + amount: float + transaction_type: Optional[str] = None + payment_method: Optional[str] = None + category: Optional[str] = None + otp: Optional[str] = None + +def format_contact_details(contacts, limit=None): + """Helper function to format contact details for error messages""" + details = [] + for b in contacts: + identifier = f"'{b.name}'" + if b.nickname: + identifier += f" (nickname: {b.nickname})" + if b.tag: + identifier += f" (tag: {b.tag})" + details.append(identifier) + + if limit and len(details) > limit: + displayed = details[:limit] + more_count = len(details) - limit + return f"{', '.join(displayed)} and {more_count} more" + return ', '.join(details) + +def find_beneficiary(db: Session, customer_id: int, to: str): + """Find a beneficiary by name, nickname, or tag with smart conflict handling.""" + # Normalize the search query + normalized_to = normalize_text(to) + + # Get all beneficiaries for this customer + all_beneficiaries = db.query(Beneficiary).filter( + Beneficiary.customer_id == customer_id + ).all() + + # Helper function to format beneficiary list based on differences + def format_beneficiaries(matches): + same_name = len(set(b.name for b in matches)) == 1 + nicknames = [b.nickname for b in matches if b.nickname] + same_nickname = len(set(nicknames)) == 1 if nicknames else False + + result = [] + if same_name and nicknames and not same_nickname: + # Names are same, nicknames different - show nicknames + key_field = "nickname" + elif same_name and (not nicknames or same_nickname): + # Names same, no nicknames or same nicknames - show tags + key_field = "tag" + else: + # Different names - show actual names + key_field = "name" + + for b in matches: + result.append({ + "id": b.id, + "name": getattr(b, key_field) or "", + "account_number": b.account_number, + }) + return result + + # First, try exact matches on each field using normalized comparison + for field in ["name", "nickname", "tag"]: + matches = [] + for beneficiary in all_beneficiaries: + field_value = getattr(beneficiary, field) + if field_value and normalized_to in normalize_text(field_value): + matches.append(beneficiary) + + if matches: + if len(matches) == 1: + return matches[0] + + # Multiple matches - return formatted list + beneficiary_list = format_beneficiaries(matches) + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail={ + "status": "duplicate", + "message": f"Multiple beneficiaries found matching '{to}'. Please confirm the correct beneficiary", + "beneficiaries": beneficiary_list + } + ) + + # No exact matches found, try partial matches using normalized comparison + matches = [] + for beneficiary in all_beneficiaries: + # Check if normalized search term is contained in any normalized field + if any( + getattr(beneficiary, field) and + normalized_to in normalize_text(getattr(beneficiary, field)) + for field in ["name", "nickname", "tag"] + ): + matches.append(beneficiary) + + if not matches: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Beneficiary '{to}' does not exist in your account. Please check the beneficiary name or add them as a new contact before sending money." + ) + + if len(matches) > 1: + # Multiple partial matches - return formatted list + beneficiary_list = format_beneficiaries(matches) + return HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail={ + "status": "duplicate", + "message": f"Multiple beneficiaries found matching '{to}'. Please confirm the correct beneficiary", + "beneficiaries": beneficiary_list + } + ) + + return matches[0] + +@router.get("/balance") +async def get_balance( + customer_id: int = None, + phone: str = None, + db: Session = Depends(get_db) +): + """Get balance for a customer account (by customer_id or phone)""" + if phone: + customer = db.query(Customer).filter(Customer.phone == phone).first() + if not customer: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Invalid phone number '{phone}'. Please check the number and try again." + ) + customer_id = customer.id + else: + customer = db.query(Customer).filter(Customer.id == customer_id).first() + if not customer: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Invalid customer ID {customer_id}." + ) + + if not customer_id: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="To know your balance, please provide either a customer ID or a registered phone number." + ) + + account = db.query(Account).filter( + Account.customer_id == customer_id, + Account.is_active == True + ).first() + + if not account: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"No active account was found for customer ID {customer_id}. If you believe this is an error, please contact customer support." + ) + + data = { + "balance": account.balance, + "customer_id": customer_id, + "customer_name": customer.name + } + + return { + "status": "success", + "message": f"Balance retrieved successfully for {customer.name}.", + "data": data + } + +@router.post("/pay") +async def pay_money( + request: PaymentRequest, + customer_id: int = None, + phone: str = None, + db: Session = Depends(get_db) +): + """Send money to a merchant or contact""" + if phone: + customer = db.query(Customer).filter(Customer.phone == phone).first() + if not customer: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Incorrect Phone number '{phone}'. Please check the number and try again." + ) + customer_id = customer.id + + if not customer_id: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="To transfer money, please provide either a customer ID or a registered phone number." + ) + print(request) + to = request.to + amount = request.amount + transaction_type = request.transaction_type or "debit" + payment_method = request.payment_method or "upi" + category = request.category + otp = request.otp + + beneficiary = find_beneficiary(db, customer_id, to) + account = db.query(Account).filter( + Account.customer_id == customer_id, + Account.is_active == True + ).first() + + if not account: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Customer ID {customer_id} is not valid. If you believe this is an error, please contact customer support." + ) + + if amount > account.balance: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Insufficient balance, your account balance of ₹{account.balance:.2f} is not enough to complete this transaction of ₹{amount:.2f}." + ) + + if not otp: + return { + "status": "otp", + "message": f"Please confirm the transaction ₹{amount:.2f} to {beneficiary.name} by entering the OTP sent to your registered mobile number.", + "data": {} + } + + # Category mapping + food_merchants = ["swiggy", "zomato", "restaurant"] + ecommerce_merchants = ["amazon", "myntra", "flipkart"] + utility_merchants = ["electricity", "water", "gas", "mobile"] + + if category: + category_lower = category.lower() + if any(m in category_lower for m in food_merchants): + category = "food" + elif any(m in category_lower for m in ecommerce_merchants): + category = "e-commerce" + elif any(m in category_lower for m in utility_merchants): + category = "utility" + else: + category = "individual" + else: + category = "individual" + + account.balance -= amount + + reference_id = f"TXN-{datetime.now().strftime('%Y%m%d%H%M%S')}" + transaction = Transaction( + transaction_type=transaction_type, + amount=amount, + recipient=beneficiary.name, + reference_id=reference_id, + payment_method=payment_method, + category=category, + from_account_id=account.id, + transaction_date=datetime.now() + ) + + db.add(transaction) + db.commit() + db.refresh(transaction) + + current_datetime = datetime.now() + recent_transactions = db.query(Transaction).filter( + Transaction.from_account_id == account.id, + Transaction.transaction_date <= current_datetime + ).order_by(desc(Transaction.transaction_date)).limit(5).all() + + recent_txn_list = [{ + "id": txn.id, + "amount": txn.amount, + "recipient": txn.recipient, + "transaction_date": txn.transaction_date.strftime("%Y-%m-%d %H:%M:%S"), + "reference_id": txn.reference_id, + "category": txn.category, + "payment_method": txn.payment_method, + "transaction_type": txn.transaction_type or "" + } for txn in recent_transactions] + + data = { + "to": beneficiary.name, + "amount": amount, + "balance": account.balance, + "reference_id": reference_id, + "payment_method": payment_method, + "category": category, + "recent_transactions": recent_txn_list + } + + return { + "status": "success", + "message": f"₹{amount:.2f} sent successfully to {beneficiary.name}.", + "data": data + } + +@router.get("/transactions") +async def search_txn( + customer_id: int = None, + phone: str = None, + recipient: str = None, + category: str = None, + limit: int = 50, + start_date: str = None, + end_date: str = None, + db: Session = Depends(get_db) +): + if phone: + customer = db.query(Customer).filter( + Customer.phone == phone, + Customer.is_active == True + ).first() + if not customer: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Invalid phone number '{phone}'." + ) + customer_id = customer.id + + base_query = db.query(Transaction).order_by(desc(Transaction.transaction_date)) + + if customer_id is not None: + accounts = db.query(Account.id).filter(Account.customer_id == customer_id).all() + account_ids = [a.id for a in accounts] + if account_ids: + base_query = base_query.filter(Transaction.from_account_id.in_(account_ids)) + else: + return {"status": "success", "message": "No transactions found.", "data": {"transactions": []}} + + transaction_ids = set() + if recipient: + recipient_query = base_query.filter(Transaction.recipient.ilike(f"%{recipient}%")) + transactions_by_recipient = recipient_query.all() + transaction_ids = set(t.id for t in transactions_by_recipient) + + if category: + category_query = base_query.filter(Transaction.category.ilike(f"%{category}%")) + transactions_by_category = category_query.all() + category_ids = set(t.id for t in transactions_by_category) + transaction_ids = transaction_ids.intersection(category_ids) if transaction_ids else category_ids + + if not recipient and not category: + transaction_ids = set(t.id for t in base_query.all()) + + if not transaction_ids: + return {"status": "success", "message": "No matching transactions found.", "data": {"transactions": []}} + + filtered_query = db.query(Transaction).filter(Transaction.id.in_(transaction_ids)) + + if start_date: + try: + start_dt = datetime.strptime(start_date, "%Y-%m-%d") + filtered_query = filtered_query.filter(Transaction.transaction_date >= start_dt) + except ValueError: + raise HTTPException(status_code=400, detail=f"Invalid start date '{start_date}', use YYYY-MM-DD format.") + + if end_date: + try: + end_dt = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1) + filtered_query = filtered_query.filter(Transaction.transaction_date < end_dt) + except ValueError: + raise HTTPException(status_code=400, detail=f"Invalid end date '{end_date}', use YYYY-MM-DD format.") + + db_transactions = filtered_query.order_by(desc(Transaction.transaction_date)).limit(limit).all() + + data = {"transactions": db_transactions} + + return { + "status": "success", + "message": f"{len(db_transactions)} transaction(s) retrieved successfully.", + "data": data + } + +@router.get("/beneficiaries") +def get_beneficiaries( + customer_id: int = None, + phone: str = None, + db: Session = Depends(get_db) +): + """Retrieve all beneficiaries for a given customer (by ID or phone) with all fields.""" + + if not customer_id and not phone: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Please provide either customer_id or phone." + ) + + # If phone is provided, find customer_id first + if phone: + customer = db.query(Customer).filter(Customer.phone == phone).first() + if not customer: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"No customer found with phone '{phone}'." + ) + customer_id = customer.id + + # Query beneficiaries + beneficiaries = db.query(Beneficiary).filter( + Beneficiary.customer_id == customer_id + ).all() + + if not beneficiaries: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="No beneficiaries found for this customer." + ) + + # Return all fields dynamically + beneficiaries = [ + {k: v for k, v in b.__dict__.items() if k != "_sa_instance_state"} + for b in beneficiaries + ] + + return{"beneficiaries": beneficiaries} diff --git a/service/config.py b/service/config.py index 0e6dcda..352b1a2 100644 --- a/service/config.py +++ b/service/config.py @@ -24,3 +24,36 @@ redis_port = int(os.getenv("REDIS_PORT", 6379)) redis_db = int(os.getenv("REDIS_DB", 0)) redis_password = os.getenv("REDIS_PASSWORD", None) +import os +from dotenv import load_dotenv + +load_dotenv() + +openai_api_key = os.getenv("OPENAI_API_KEY") +#model_id = os.getenv('MODEL_ID', 'large-v3') +model_id = os.getenv('MODEL_ID','small') +model_path = os.getenv('MODEL_PATH', './models') +ollama_host = os.getenv("OLLAMA_HOST", "http://ollama:11434") +ollama_model_name = os.getenv("OLLAMA_MODEL_NAME", "llama3.2") +open_ai_model_name = os.getenv("OPENAI_MODEL_NAME", "gpt-4") +ollama_translation_model_name = os.getenv("OLLAMA_TRANS_MODEL","gemma2:latest") +open_ai_temperature = os.getenv("OPENAI_TEMPERATURE", 0.2) +db_user = os.getenv("DB_USER") +db_password = os.getenv("DB_PASSWORD") +db_host = os.getenv("DB_HOST") +db_port = os.getenv("DB_PORT") +db_name = os.getenv("DB_NAME") + +sarvam_api_key = os.getenv("SARVAM_API_KEY","sk_t7fvsjjb_7JsD5ZXGrEhHqjUtAQSFsCxB") +# Redis configuration +redis_host = os.getenv("REDIS_HOST", "localhost") +redis_port = int(os.getenv("REDIS_PORT", 6379)) +redis_db = int(os.getenv("REDIS_DB", 0)) +redis_password = os.getenv("REDIS_PASSWORD", None) + +# Langflow API configuration +langflow_api_url = os.getenv("LANGFLOW_API_URL", "http://localhost:7860") +langflow_flow_id = os.getenv("LANGFLOW_FLOW_ID", "df6ef421-30ef-4901-bc8b-270c2ce61d41") +langflow_api_key = os.getenv("LANGFLOW_API_KEY", "sk-SCQyDlsYB7qPzmzL3yivQs-J5JmvX82uHVbDiGWrQR8") +langflow_timeout = int(os.getenv("LANGFLOW_TIMEOUT", 60)) # timeout in seconds + diff --git a/service/main.py b/service/main.py index f11c046..3f84f4f 100644 --- a/service/main.py +++ b/service/main.py @@ -1,11 +1,13 @@ -from fastapi import FastAPI, UploadFile, File, Form, Depends +from fastapi import FastAPI, UploadFile, File, Form, Depends, BackgroundTasks from fastapi.responses import JSONResponse from logger import logger from dotenv import load_dotenv +from config import langflow_api_url, langflow_flow_id, langflow_api_key, langflow_timeout from starlette.middleware.cors import CORSMiddleware from audio_service import translate_with_whisper from audio_service import translate_with_whisper_timestamped, translate_with_whisper_from_upload from detect_intent import detect_intent_with_llama, format_intent_response, translate +from typing import Dict, Any, List, Optional from summarizer import summarize_using_openai from summarizer import summarize_using_ollama from pydantic import BaseModel @@ -13,15 +15,23 @@ from util import generate_timestamp_json from fastapi_versionizer.versionizer import Versionizer, api_version import json -from banking.core_banking_routes import router as banking_router +from banking.core_banking_routes_v2 import router as banking_router from orchestrator import orchestrate_banking_request from typing import Optional import httpx from redis_client import session_manager from datetime import datetime from session_service import SessionService, SessionFlowProcessor +from contextlib import asynccontextmanager -app = FastAPI() +# Set up async HTTP client for Langflow API +@asynccontextmanager +async def lifespan(app: FastAPI): + app.state.langflow_client = httpx.AsyncClient() + yield + await app.state.langflow_client.aclose() + +app = FastAPI(lifespan=lifespan) # Add CORS middleware to the application app.add_middleware( @@ -232,3 +242,165 @@ async def transcribe_intent( logger.error(f"Error in transcribe-intent: {traceback.format_exc()}") current_session_id = session_id if session_id else "unknown" return JSONResponse(content={"message": str(e), "session_id": current_session_id}, status_code=500) + + +@app.post("/voice/process-with-langflow") +async def process_with_langflow( + audio: Optional[UploadFile] = File(None), + session_id: Optional[str] = Form(None), + customer_id: Optional[int] = Form(None), + phone: Optional[str] = Form(None), + api_key: Optional[str] = Form(None) +): + """ + Process audio through Langflow for intent detection and API execution. + + Steps: + 1. Audio is transcribed using Whisper + 2. Transcribed text is sent to Langflow for processing + 3. Langflow processes the intent and calls necessary APIs + 4. Results are returned to the client + """ + try: + if not audio: + return JSONResponse(status_code=400, content={"message": "No audio file provided"}) + + # Step 1: Transcribe audio + id, response, lang, dia = translate_with_whisper_from_upload(audio) + translation_text = response[1] + language = lang[1] + + logger.info("Translation done") + logger.info(f"Translated text: {translation_text}") + logger.info(f"Detected language: {language}") + + # # Verify request has required authentication + # server_api_key = langflow_api_key + # if server_api_key: + # return JSONResponse( + # status_code=401, + # content={"message": "Invalid API key. Please provide a valid API key to access this endpoint."} + # ) + + # Step 2: Send to Langflow + langflow_response = await call_langflow_api(translation_text, language, customer_id, phone, session_id, api_key) + + # Step 3: Format and return response + return JSONResponse(content={ + "status": "success", + "message": "Audio processed through Langflow successfully", + "translation": translation_text, + "language": language, + "response": langflow_response, + "session_id": session_id + }, status_code=200) + + except Exception as e: + logger.error(f"Error in process-with-langflow: {traceback.format_exc()}") + current_session_id = session_id if session_id else "unknown" + + # Categorize errors for better client handling + status_code = 500 + error_type = "server_error" + + if "Invalid API key" in str(e): + status_code = 401 + error_type = "authentication_error" + elif "timed out" in str(e): + status_code = 504 + error_type = "timeout_error" + elif "rate limit" in str(e): + status_code = 429 + error_type = "rate_limit_error" + + return JSONResponse(content={ + "message": str(e), + "session_id": current_session_id, + "error_type": error_type + }, status_code=status_code) + + +async def call_langflow_api(text: str, language: str, customer_id: Optional[int] = None, + phone: Optional[str] = None, session_id: Optional[str] = None, + client_api_key: Optional[str] = None) -> Dict[str, Any]: + """ + Call the Langflow API with the translated text. + + Args: + text: The translated text + language: Detected language code + customer_id: Optional customer ID + phone: Optional phone number + session_id: Optional session ID + + Returns: + Processed response from Langflow + """ + try: + # Construct the URL for the Langflow API + url = f"{langflow_api_url}/api/v1/run/{langflow_flow_id}" + + # Prepare the payload + payload = { + "inputs": { + "text": text, + "language": language, + "customer_id": customer_id, + "phone": phone, + "session_id": session_id + }, + "tweaks": {} + } + + # Set up headers + headers = { + "Content-Type": "application/json" + } + + # Add API key if available - prioritize client API key over server API key + api_key_to_use = client_api_key or langflow_api_key + if api_key_to_use: + # Set the x-api-key header as required by Langflow v1.5+ + headers["x-api-key"] = api_key_to_use + + logger.info(f"Calling Langflow API at {url}") + # Make the API call + async with httpx.AsyncClient(timeout=langflow_timeout) as client: + response = await client.post(url, json=payload, headers=headers) + + # Check if the request was successful + response.raise_for_status() + + # Additional validation for empty responses + if response.status_code == 204 or not response.text: + logger.warning("Langflow API returned an empty response") + return {"warning": "Langflow returned an empty response. The flow might not be configured correctly."} + + # Handle rate limiting + if response.status_code == 429: + logger.warning("Langflow API rate limit reached") + raise Exception("Langflow API rate limit reached. Please try again later.") + + # Parse the response + result = response.json() + logger.info(f"Langflow API response: {result}") + # Sanitize the result to avoid exposing sensitive information + if isinstance(result, dict): + # Remove any potential sensitive information + result.pop("api_key", None) + result.pop("auth_token", None) + result.pop("password", None) + + logger.info(f"Langflow API response processed successfully") + + return result + except httpx.TimeoutException: + logger.error("Langflow API call timed out") + raise Exception("Langflow processing timed out. Please try again later.") + except httpx.HTTPStatusError as e: + logger.error(f"Langflow API HTTP error: {e}") + raise Exception(f"Error calling Langflow API: {e.response.status_code} - {e.response.text}") + except Exception as e: + logger.error(f"Error calling Langflow API: {str(e)}") + raise Exception(f"Error processing with Langflow: {str(e)}") +