FastAPI ۋە LangChain نى بىرلەشتۈرۈپ، ئىشلەتكۈچى كۆپ بولغان Production دەرىجىدىكى AI API قۇرۇش. Streaming، Auth، RAG، Agent، WebSocket — تولۇق كود مىسالى بىلەن.
# ── ئورنىتىش ───────────────────────────────────────────── pip install fastapi uvicorn[standard] python-dotenv pip install langchain langchain-openai langchain-anthropic pip install langchain-chroma sentence-transformers pip install pydantic pydantic-settings pip install python-jose[cryptography] passlib[bcrypt] pip install redis slowapi httpx # ── لايىھە قۇرۇلمىسى ───────────────────────────────────── ai_api/ ├── app/ │ ├── __init__.py │ ├── main.py ← FastAPI ئاساسلىق ھۆججىتى │ ├── config.py ← تەڭشەك │ ├── dependencies.py ← ئورتاق تارماق │ ├── api/ │ │ ├── v1/ │ │ │ ├── chat.py ← سۆھبەت endpoint │ │ │ ├── rag.py ← RAG endpoint │ │ │ ├── agent.py ← Agent endpoint │ │ │ └── stream.py ← Streaming endpoint │ ├── core/ │ │ ├── auth.py ← JWT تەستىقلاش │ │ ├── rate_limit.py ← تېزلىك چەك │ │ └── middleware.py ← CORS، Logging │ ├── models/ │ │ ├── schemas.py ← Pydantic مودېللار │ │ └── db.py ← سانلىق مەلۇمات │ └── services/ │ ├── llm_service.py ← LLM باشقۇرۇش │ ├── rag_service.py ← RAG تىزىمى │ └── memory_service.py ├── tests/ ├── Dockerfile ├── docker-compose.yml └── .env
from pydantic_settings import BaseSettings from functools import lru_cache class Settings(BaseSettings): # LLM openai_api_key: str anthropic_api_key: str = "" default_model: str = "gpt-4o-mini" # JWT secret_key: str algorithm: str = "HS256" access_token_expire_minutes: int = 30 # Redis redis_url: str = "redis://localhost:6379" # Vector DB chroma_path: str = "./chroma_db" # تېزلىك چەك rate_limit_per_minute: int = 60 class Config: env_file = ".env" @lru_cache() def get_settings() -> Settings: return Settings()
from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from contextlib import asynccontextmanager from app.api.v1 import chat, rag, agent, stream from app.core.middleware import LoggingMiddleware from app.config import get_settings settings = get_settings() # ── Startup / Shutdown ──────────────────────────────────── @asynccontextmanager async def lifespan(app: FastAPI): # باشلانغاندا print("🚀 AI API باشلاندى") yield # توختىغاندا print("🛑 AI API توختىدى") # ── FastAPI ئاپپ ───────────────────────────────────────── app = FastAPI( title = "idirak.com AI API", description = "LangChain + FastAPI ئاساسلىق AI مۇلازىمەت API", version = "1.0.0", lifespan = lifespan, docs_url = "/docs", # Swagger UI redoc_url = "/redoc", # ReDoc UI ) # ── CORS ───────────────────────────────────────────────── app.add_middleware( CORSMiddleware, allow_origins = ["https://idirak.com", "http://localhost:3000"], allow_credentials = True, allow_methods = ["*"], allow_headers = ["*"], ) app.add_middleware(LoggingMiddleware) # ── Router تىزىملىشى ───────────────────────────────────── app.include_router(chat.router, prefix="/api/v1/chat", tags=["chat"]) app.include_router(rag.router, prefix="/api/v1/rag", tags=["rag"]) app.include_router(agent.router, prefix="/api/v1/agent", tags=["agent"]) app.include_router(stream.router, prefix="/api/v1/stream", tags=["stream"]) # ── ئاساسلىق نۇقتا ─────────────────────────────────────── @app.get("/") async def root(): return {"message": "idirak.com AI API", "version": "1.0.0", "status": "running"} @app.get("/health") async def health(): return {"status": "healthy"} # ── ئىجرا ──────────────────────────────────────────────── # uvicorn app.main:app --reload --port 8000
from pydantic import BaseModel, Field, field_validator from typing import Optional, List, Literal from enum import Enum # ── مودېل تاللانمىلىرى ──────────────────────────────────── class ModelChoice(str, Enum): GPT4O = "gpt-4o" GPT4O_MINI = "gpt-4o-mini" CLAUDE = "claude-sonnet-4-6" OLLAMA = "llama3.1:8b" # ── سۆھبەت مۇناسىۋەت ───────────────────────────────────── class Message(BaseModel): role: Literal["system", "user", "assistant"] content: str = Field(..., min_length=1, max_length=32000) # ── Chat سوئال ─────────────────────────────────────────── class ChatRequest(BaseModel): messages: List[Message] model: ModelChoice = ModelChoice.GPT4O_MINI temperature: float = Field(0.7, ge=0, le=2) max_tokens: int = Field(1024, ge=1, le=4096) stream: bool = False session_id: Optional[str] = None @field_validator("messages") @classmethod def validate_messages(cls, v): if not v: raise ValueError("messages بوش بولمىسۇن") return v # ── Chat جاۋاپ ─────────────────────────────────────────── class ChatResponse(BaseModel): id: str content: str model: str tokens_used: int created_at: str # ── RAG سوئال ──────────────────────────────────────────── class RAGRequest(BaseModel): query: str = Field(..., min_length=1) collection: str = "default" top_k: int = Field(5, ge=1, le=20) model: ModelChoice = ModelChoice.GPT4O_MINI with_sources: bool = True class RAGResponse(BaseModel): answer: str sources: List[dict] tokens: int # ── JWT Token ──────────────────────────────────────────── class Token(BaseModel): access_token: str token_type: str = "bearer" expires_in: int class TokenData(BaseModel): user_id: Optional[str] = None scopes: List[str] = []
from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, APIKeyHeader from jose import JWTError, jwt from datetime import datetime, timedelta from app.config import get_settings settings = get_settings() oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/token") api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False) # ── JWT Token ياساش ─────────────────────────────────────── def create_access_token(data: dict) -> str: to_encode = data.copy() expire = datetime.utcnow() + timedelta(minutes=settings.access_token_expire_minutes) to_encode.update({"exp": expire}) return jwt.encode(to_encode, settings.secret_key, algorithm=settings.algorithm) # ── JWT Token تەستىقلاش ─────────────────────────────────── async def get_current_user(token: str = Depends(oauth2_scheme)) -> dict: credentials_exception = HTTPException( status_code = status.HTTP_401_UNAUTHORIZED, detail = "كىملىك دەلىللىنىمىدى", headers = {"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm]) user_id: str = payload.get("sub") if user_id is None: raise credentials_exception return {"user_id": user_id} except JWTError: raise credentials_exception # ── API Key تەستىقلاش (ئاددىي) ─────────────────────────── async def verify_api_key(api_key: str = Depends(api_key_header)): valid_keys = {"idirak-key-001", "idirak-key-002"} # DB دىن ئالغىن if not api_key or api_key not in valid_keys: raise HTTPException(status_code=403, detail="API كىلىتى ئىناۋەتسىز") return api_key # ── تېزلىك چەك (Rate Limiting with Redis) ──────────────── import redis.asyncio as aioredis redis_client = aioredis.from_url(settings.redis_url) async def rate_limiter(user_id: str, limit: int = 60): key = f"rate:{user_id}:{datetime.utcnow().minute}" count = await redis_client.incr(key) await redis_client.expire(key, 60) if count > limit: raise HTTPException(status_code=429, detail="تېزلىك چەكتىن ئاشتى")
from langchain_openai import ChatOpenAI from langchain_anthropic import ChatAnthropic from langchain_ollama import ChatOllama from langchain_core.messages import HumanMessage, SystemMessage, AIMessage from langchain_core.output_parsers import StrOutputParser from app.models.schemas import ModelChoice, Message from app.config import get_settings from typing import AsyncGenerator settings = get_settings() def get_llm(model: ModelChoice, temperature: float = 0.7, streaming: bool = False): match model: case ModelChoice.GPT4O | ModelChoice.GPT4O_MINI: return ChatOpenAI(model=model.value, temperature=temperature, streaming=streaming) case ModelChoice.CLAUDE: return ChatAnthropic(model=model.value, temperature=temperature, streaming=streaming) case ModelChoice.OLLAMA: return ChatOllama(model=model.value, temperature=temperature) def convert_messages(messages: list[Message]): result = [] for m in messages: match m.role: case "system": result.append(SystemMessage(content=m.content)) case "user": result.append(HumanMessage(content=m.content)) case "assistant": result.append(AIMessage(content=m.content)) return result async def stream_response(messages, model, temperature) -> AsyncGenerator[str, None]: llm = get_llm(model, temperature, streaming=True) lc_msgs = convert_messages(messages) async for chunk in llm.astream(lc_msgs): yield chunk.content
from fastapi import APIRouter, Depends, HTTPException from app.models.schemas import ChatRequest, ChatResponse from app.core.auth import get_current_user, rate_limiter from app.services.llm_service import get_llm, convert_messages from datetime import datetime import uuid router = APIRouter() @router.post("/completions", response_model=ChatResponse) async def chat_completions( request: ChatRequest, current_user: dict = Depends(get_current_user), ): # تېزلىك چەك تەكشۈرۈش await rate_limiter(current_user["user_id"]) try: llm = get_llm(request.model, request.temperature) messages = convert_messages(request.messages) response = await llm.ainvoke(messages) return ChatResponse( id = str(uuid.uuid4()), content = response.content, model = request.model.value, tokens_used = response.usage_metadata.get("total_tokens", 0), created_at = datetime.utcnow().isoformat(), ) except Exception as e: raise HTTPException(status_code=500, detail=str(e))
from fastapi import APIRouter, Depends from fastapi.responses import StreamingResponse from app.models.schemas import ChatRequest from app.core.auth import get_current_user from app.services.llm_service import stream_response import json router = APIRouter() @router.post("/chat") async def stream_chat( request: ChatRequest, current_user: dict = Depends(get_current_user), ): async def generate(): async for chunk in stream_response( request.messages, request.model, request.temperature ): if chunk: data = json.dumps({"content": chunk, "done": False}) yield f"data: {data}\n\n" # تاماملاندى بەلگىسى yield f"data: {json.dumps({'content': '', 'done': True})}\n\n" return StreamingResponse( generate(), media_type = "text/event-stream", headers = { "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", } ) # ── Frontend JavaScript مىسالى ──────────────────────────── """ const response = await fetch('/api/v1/stream/chat', { method: 'POST', headers: {'Authorization': `Bearer ${token}`, 'Content-Type': 'application/json'}, body: JSON.stringify({messages: [{role: 'user', content: 'سالام!'}]}) }); const reader = response.body.getReader(); while (true) { const {done, value} = await reader.read(); if (done) break; const text = new TextDecoder().decode(value); const lines = text.split('\n\n'); for (const line of lines) { if (line.startsWith('data: ')) { const data = JSON.parse(line.slice(6)); if (!data.done) console.log(data.content); } } } """
from fastapi import APIRouter, UploadFile, File, Depends, HTTPException from langchain_community.document_loaders import PyPDFLoader, TextLoader from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_openai import OpenAIEmbeddings from langchain_chroma import Chroma from langchain.chains import create_retrieval_chain from langchain.chains.combine_documents import create_stuff_documents_chain from langchain_core.prompts import ChatPromptTemplate from app.models.schemas import RAGRequest, RAGResponse from app.services.llm_service import get_llm from app.core.auth import get_current_user import tempfile, os router = APIRouter() embeddings = OpenAIEmbeddings(model="text-embedding-3-small") # ── ھۆججەت يوللاش ──────────────────────────────────────── @router.post("/upload") async def upload_document( file: UploadFile = File(...), collection: str = "default", current_user: dict = Depends(get_current_user), ): # ھۆججەت تۈرى تەكشۈرۈش if not file.filename.endswith((".pdf", ".txt", ".md")): raise HTTPException(status_code=400, detail="PDF، TXT، MD قوبۇل قىلىنىدۇ") # ۋاقىتلىق ھۆججەت ساقلاش with tempfile.NamedTemporaryFile(delete=False, suffix=file.filename) as tmp: tmp.write(await file.read()) tmp_path = tmp.name try: # يوللاش ۋە بۆلۈش loader = PyPDFLoader(tmp_path) if file.filename.endswith(".pdf") else TextLoader(tmp_path) docs = loader.load() splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=64) chunks = splitter.split_documents(docs) # Vector DB غا قوشۇش db = Chroma(collection_name=collection, embedding_function=embeddings, persist_directory="./chroma") db.add_documents(chunks) return {"message": f"{len(chunks)} بۆلۈك قوشۇلدى", "collection": collection} finally: os.unlink(tmp_path) # ── RAG سوئال ──────────────────────────────────────────── @router.post("/query", response_model=RAGResponse) async def rag_query( request: RAGRequest, current_user: dict = Depends(get_current_user), ): db = Chroma(collection_name=request.collection, embedding_function=embeddings, persist_directory="./chroma") retriever = db.as_retriever(search_kwargs={"k": request.top_k}) llm = get_llm(request.model) qa_prompt = ChatPromptTemplate.from_messages([ ("system", "پەقەت بىرىلگەن مەزمۇنغا ئاساسلانغان جاۋاپ بەر:\n{context}"), ("human", "{input}"), ]) chain = create_retrieval_chain(retriever, create_stuff_documents_chain(llm, qa_prompt)) result = await chain.ainvoke({"input": request.query}) sources = [] if request.with_sources: sources = [{ "content": d.page_content[:200], "source": d.metadata.get("source", "نامەلۇم"), } for d in result["context"]] return RAGResponse(answer=result["answer"], sources=sources, tokens=0)
from fastapi import APIRouter, Depends from pydantic import BaseModel from langchain.agents import AgentExecutor, create_tool_calling_agent from langchain.tools import tool from langchain_community.tools import DuckDuckGoSearchRun from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from app.services.llm_service import get_llm from app.models.schemas import ModelChoice from app.core.auth import get_current_user router = APIRouter() class AgentRequest(BaseModel): task: str model: ModelChoice = ModelChoice.GPT4O_MINI # ── قوراللار تارىفى ────────────────────────────────────── @tool def calculate(expression: str) -> str: """ماتېماتىكىلىق ئىپادىنى ھىسابلا. مىسال: 2+2, sqrt(16)""" import math try: return str(eval(expression, {"__builtins__": {}}, vars(math))) except Exception as e: return f"خاتالىق: {e}" tools = [DuckDuckGoSearchRun(), calculate] @router.post("/run") async def run_agent( request: AgentRequest, current_user: dict = Depends(get_current_user), ): llm = get_llm(request.model) prompt = ChatPromptTemplate.from_messages([ ("system", "سەن ئۇيغۇرچە ياردەمچىسەن. قورالدىن ئىشلىت."), ("human", "{input}"), MessagesPlaceholder("agent_scratchpad"), ]) agent = create_tool_calling_agent(llm, tools, prompt) executor = AgentExecutor(agent=agent, tools=tools, max_iterations=5) result = await executor.ainvoke({"input": request.task}) return {"output": result["output"]}
from fastapi import APIRouter, WebSocket, WebSocketDisconnect from langchain_openai import ChatOpenAI from langchain_core.messages import HumanMessage, SystemMessage import json router = APIRouter() # بارلىق باغلانغان ئىشلەتكۈچىلەرنى ساقلاش connections: dict[str, WebSocket] = {} @router.websocket("/ws/{session_id}") async def websocket_chat(websocket: WebSocket, session_id: str): await websocket.accept() connections[session_id] = websocket llm = ChatOpenAI(model="gpt-4o-mini", streaming=True) history = [SystemMessage(content="سەن ئۇيغۇرچە ياردەمچىسەن.")] try: while True: # ئىشلەتكۈچىدىن سوئال قوبۇل قىلىش data = await websocket.receive_text() payload = json.loads(data) history.append(HumanMessage(content=payload["message"])) # ئاقىملىق جاۋاپ يوللاش full_response = "" async for chunk in llm.astream(history): if chunk.content: full_response += chunk.content await websocket.send_json({ "type": "chunk", "content": chunk.content }) # تاماملاندى await websocket.send_json({"type": "done", "content": ""}) history.append(AIMessage(content=full_response)) except WebSocketDisconnect: connections.pop(session_id, None)
FROM python:3.12-slim WORKDIR /app # تەلەپلەر COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # كود COPY . . # Healthcheck HEALTHCHECK --interval=30s --timeout=10s \ CMD curl -f http://localhost:8000/health || exit 1 # ئىجرا CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
version: '3.9' services: api: build: . ports: ["8000:8000"] environment: - OPENAI_API_KEY=${OPENAI_API_KEY} - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY} - SECRET_KEY=${SECRET_KEY} - REDIS_URL=redis://redis:6379 depends_on: [redis] volumes: - ./chroma_db:/app/chroma_db restart: unless-stopped redis: image: redis:7-alpine volumes: [redis_data:/data] restart: unless-stopped nginx: image: nginx:alpine ports: ["80:80", "443:443"] volumes: - ./nginx.conf:/etc/nginx/nginx.conf - ./certs:/etc/ssl/certs depends_on: [api] volumes: {redis_data:}
| نۇقتا | ئۇسۇل | ئۇيغۇرچە | Auth |
|---|---|---|---|
| /api/v1/auth/token | POST | JWT token ئالىش | ئاچىق |
| /api/v1/chat/completions | POST | LLM جاۋاپ ئالىش | JWT |
| /api/v1/stream/chat | POST | ئاقىملىق SSE جاۋاپ | JWT |
| /api/v1/rag/upload | POST | ھۆججەت يوللاش | JWT |
| /api/v1/rag/query | POST | RAG سوئال-جاۋاپ | JWT |
| /api/v1/agent/run | POST | Agent ۋەزىپىسى | JWT |
| /ws/{session_id} | WS | يىلتىز سۆھبەت | JWT |
| /health | GET | سىستېما ساغلاملىقى | ئاچىق |
| /docs | GET | Swagger UI | ئاچىق |