POST /api/v1/chat/completions
GET /api/v1/models
POST /api/v1/embeddings
WS /ws/stream
POST /api/v1/rag/query
GET POST PUT DEL FastAPI + LangChain

FastAPI + LangChain

Production AI API — Complete Uyghur Reference

FastAPI ۋە LangChain نى بىرلەشتۈرۈپ، ئىشلەتكۈچى كۆپ بولغان Production دەرىجىدىكى AI API قۇرۇش. Streaming، Auth، RAG، Agent، WebSocket — تولۇق كود مىسالى بىلەن.

FastAPIASGI Framework
🔗 LangChainLLM Orchestration
UvicornASGI Server
PydanticData Validation
⚙️ 01 — ئورنىتىش ۋە قۇرۇلما 🚀 02 — FastAPI ئاساسى 📦 03 — Pydantic مودېل 🔐 04 — Auth & Security 🤖 05 — LangChain بىرلەشتۈرۈش 📡 06 — Streaming API 🔍 07 — RAG Endpoint 🤖 08 — Agent Endpoint 🌐 09 — WebSocket 🚢 10 — Docker & Deploy
01
ئورنىتىش ۋە لايىھە قۇرۇلمىسى
Installation & Project Structure
Production دەرىجىدىكى FastAPI + LangChain لايىھىسىنى باشتىن قۇرۇش
FastAPI — Python نىڭ ئەڭ تېز ۋە ئازاقلى ئۇسۇلدا API قۇرىدىغان چارچۇۋىسى. OpenAPI (Swagger) ئاپتوماتىك ھاسىللايدۇ. Pydantic بىلەن سانلىق دەلىللىيىدۇ. LangChain + FastAPI = AI API ئۈچۈن ئەڭ ياخشى تاللاش.
BASHsetup.sh
# ── ئورنىتىش ─────────────────────────────────────────────
pip install fastapi uvicorn[standard] python-dotenv
pip install langchain langchain-openai langchain-anthropic
pip install langchain-chroma sentence-transformers
pip install pydantic pydantic-settings
pip install python-jose[cryptography] passlib[bcrypt]
pip install redis slowapi httpx

# ── لايىھە قۇرۇلمىسى ─────────────────────────────────────
ai_api/
├── app/
│   ├── __init__.py
│   ├── main.py              ← FastAPI ئاساسلىق ھۆججىتى
│   ├── config.py            ← تەڭشەك
│   ├── dependencies.py      ← ئورتاق تارماق
│   ├── api/
│   │   ├── v1/
│   │   │   ├── chat.py      ← سۆھبەت endpoint
│   │   │   ├── rag.py       ← RAG endpoint
│   │   │   ├── agent.py     ← Agent endpoint
│   │   │   └── stream.py    ← Streaming endpoint
│   ├── core/
│   │   ├── auth.py          ← JWT تەستىقلاش
│   │   ├── rate_limit.py    ← تېزلىك چەك
│   │   └── middleware.py    ← CORS، Logging
│   ├── models/
│   │   ├── schemas.py       ← Pydantic مودېللار
│   │   └── db.py            ← سانلىق مەلۇمات
│   └── services/
│       ├── llm_service.py   ← LLM باشقۇرۇش
│       ├── rag_service.py   ← RAG تىزىمى
│       └── memory_service.py
├── tests/
├── Dockerfile
├── docker-compose.yml
└── .env
PYTHONapp/config.py
from pydantic_settings import BaseSettings
from functools import lru_cache

class Settings(BaseSettings):
    # LLM
    openai_api_key:     str
    anthropic_api_key:  str = ""
    default_model:      str = "gpt-4o-mini"

    # JWT
    secret_key:         str
    algorithm:          str = "HS256"
    access_token_expire_minutes: int = 30

    # Redis
    redis_url:          str = "redis://localhost:6379"

    # Vector DB
    chroma_path:        str = "./chroma_db"

    # تېزلىك چەك
    rate_limit_per_minute: int = 60

    class Config:
        env_file = ".env"

@lru_cache()
def get_settings() -> Settings:
    return Settings()
02
FastAPI ئاساسى — main.py
FastAPI Core — App Setup, Routers, Middleware
ئاساسلىق ئاپپ قۇرۇلمىسى، Middleware، Router تىزىملىشى
PYTHONapp/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from app.api.v1 import chat, rag, agent, stream
from app.core.middleware import LoggingMiddleware
from app.config import get_settings

settings = get_settings()

# ── Startup / Shutdown ────────────────────────────────────
@asynccontextmanager
async def lifespan(app: FastAPI):
    # باشلانغاندا
    print("🚀 AI API باشلاندى")
    yield
    # توختىغاندا
    print("🛑 AI API توختىدى")

# ── FastAPI ئاپپ ─────────────────────────────────────────
app = FastAPI(
    title       = "idirak.com AI API",
    description = "LangChain + FastAPI ئاساسلىق AI مۇلازىمەت API",
    version     = "1.0.0",
    lifespan    = lifespan,
    docs_url    = "/docs",    # Swagger UI
    redoc_url   = "/redoc",   # ReDoc UI
)

# ── CORS ─────────────────────────────────────────────────
app.add_middleware(
    CORSMiddleware,
    allow_origins     = ["https://idirak.com", "http://localhost:3000"],
    allow_credentials = True,
    allow_methods     = ["*"],
    allow_headers     = ["*"],
)
app.add_middleware(LoggingMiddleware)

# ── Router تىزىملىشى ─────────────────────────────────────
app.include_router(chat.router,   prefix="/api/v1/chat",   tags=["chat"])
app.include_router(rag.router,    prefix="/api/v1/rag",    tags=["rag"])
app.include_router(agent.router,  prefix="/api/v1/agent",  tags=["agent"])
app.include_router(stream.router, prefix="/api/v1/stream", tags=["stream"])

# ── ئاساسلىق نۇقتا ───────────────────────────────────────
@app.get("/")
async def root():
    return {"message": "idirak.com AI API", "version": "1.0.0", "status": "running"}

@app.get("/health")
async def health():
    return {"status": "healthy"}

# ── ئىجرا ────────────────────────────────────────────────
# uvicorn app.main:app --reload --port 8000
03
Pydantic مودېل ۋە Schema
Pydantic Models — Request & Response Schemas
API كىرگۈزمە/چىقىم دەلىللىشى — ئاپتوماتىك OpenAPI ھۆججىتى
PYTHONapp/models/schemas.py
from pydantic import BaseModel, Field, field_validator
from typing  import Optional, List, Literal
from enum    import Enum

# ── مودېل تاللانمىلىرى ────────────────────────────────────
class ModelChoice(str, Enum):
    GPT4O       = "gpt-4o"
    GPT4O_MINI  = "gpt-4o-mini"
    CLAUDE      = "claude-sonnet-4-6"
    OLLAMA      = "llama3.1:8b"

# ── سۆھبەت مۇناسىۋەت ─────────────────────────────────────
class Message(BaseModel):
    role:    Literal["system", "user", "assistant"]
    content: str = Field(..., min_length=1, max_length=32000)

# ── Chat سوئال ───────────────────────────────────────────
class ChatRequest(BaseModel):
    messages:    List[Message]
    model:       ModelChoice = ModelChoice.GPT4O_MINI
    temperature: float        = Field(0.7, ge=0, le=2)
    max_tokens:  int          = Field(1024, ge=1, le=4096)
    stream:      bool         = False
    session_id:  Optional[str] = None

    @field_validator("messages")
    @classmethod
    def validate_messages(cls, v):
        if not v:
            raise ValueError("messages بوش بولمىسۇن")
        return v

# ── Chat جاۋاپ ───────────────────────────────────────────
class ChatResponse(BaseModel):
    id:           str
    content:      str
    model:        str
    tokens_used:  int
    created_at:   str

# ── RAG سوئال ────────────────────────────────────────────
class RAGRequest(BaseModel):
    query:        str   = Field(..., min_length=1)
    collection:   str   = "default"
    top_k:        int   = Field(5, ge=1, le=20)
    model:        ModelChoice = ModelChoice.GPT4O_MINI
    with_sources: bool = True

class RAGResponse(BaseModel):
    answer:    str
    sources:   List[dict]
    tokens:    int

# ── JWT Token ────────────────────────────────────────────
class Token(BaseModel):
    access_token:  str
    token_type:    str = "bearer"
    expires_in:    int

class TokenData(BaseModel):
    user_id:   Optional[str] = None
    scopes:    List[str]     = []
04
Auth ۋە بىخەتەرلىك
Authentication — JWT, API Key, Rate Limiting
JWT تەستىقلاش، API كىلىتى، تېزلىك چەك — Production بىخەتەرلىك
PYTHONapp/core/auth.py
from fastapi          import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, APIKeyHeader
from jose            import JWTError, jwt
from datetime        import datetime, timedelta
from app.config      import get_settings

settings       = get_settings()
oauth2_scheme  = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/token")
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)

# ── JWT Token ياساش ───────────────────────────────────────
def create_access_token(data: dict) -> str:
    to_encode = data.copy()
    expire    = datetime.utcnow() + timedelta(minutes=settings.access_token_expire_minutes)
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, settings.secret_key, algorithm=settings.algorithm)

# ── JWT Token تەستىقلاش ───────────────────────────────────
async def get_current_user(token: str = Depends(oauth2_scheme)) -> dict:
    credentials_exception = HTTPException(
        status_code = status.HTTP_401_UNAUTHORIZED,
        detail      = "كىملىك دەلىللىنىمىدى",
        headers     = {"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
        user_id: str = payload.get("sub")
        if user_id is None:
            raise credentials_exception
        return {"user_id": user_id}
    except JWTError:
        raise credentials_exception

# ── API Key تەستىقلاش (ئاددىي) ───────────────────────────
async def verify_api_key(api_key: str = Depends(api_key_header)):
    valid_keys = {"idirak-key-001", "idirak-key-002"}  # DB دىن ئالغىن
    if not api_key or api_key not in valid_keys:
        raise HTTPException(status_code=403, detail="API كىلىتى ئىناۋەتسىز")
    return api_key

# ── تېزلىك چەك (Rate Limiting with Redis) ────────────────
import redis.asyncio as aioredis

redis_client = aioredis.from_url(settings.redis_url)

async def rate_limiter(user_id: str, limit: int = 60):
    key   = f"rate:{user_id}:{datetime.utcnow().minute}"
    count = await redis_client.incr(key)
    await redis_client.expire(key, 60)
    if count > limit:
        raise HTTPException(status_code=429, detail="تېزلىك چەكتىن ئاشتى")
05
LangChain بىرلەشتۈرۈش
LangChain + FastAPI — Chat Endpoint
LLM خىزمەت قاتلىمى ۋە Chat API نۇقتىسى
PYTHONapp/services/llm_service.py
from langchain_openai    import ChatOpenAI
from langchain_anthropic import ChatAnthropic
from langchain_ollama   import ChatOllama
from langchain_core.messages        import HumanMessage, SystemMessage, AIMessage
from langchain_core.output_parsers  import StrOutputParser
from app.models.schemas import ModelChoice, Message
from app.config         import get_settings
from typing             import AsyncGenerator

settings = get_settings()

def get_llm(model: ModelChoice, temperature: float = 0.7, streaming: bool = False):
    match model:
        case ModelChoice.GPT4O | ModelChoice.GPT4O_MINI:
            return ChatOpenAI(model=model.value, temperature=temperature, streaming=streaming)
        case ModelChoice.CLAUDE:
            return ChatAnthropic(model=model.value, temperature=temperature, streaming=streaming)
        case ModelChoice.OLLAMA:
            return ChatOllama(model=model.value, temperature=temperature)

def convert_messages(messages: list[Message]):
    result = []
    for m in messages:
        match m.role:
            case "system":    result.append(SystemMessage(content=m.content))
            case "user":      result.append(HumanMessage(content=m.content))
            case "assistant": result.append(AIMessage(content=m.content))
    return result

async def stream_response(messages, model, temperature) -> AsyncGenerator[str, None]:
    llm      = get_llm(model, temperature, streaming=True)
    lc_msgs  = convert_messages(messages)
    async for chunk in llm.astream(lc_msgs):
        yield chunk.content
PYTHONapp/api/v1/chat.py
from fastapi              import APIRouter, Depends, HTTPException
from app.models.schemas  import ChatRequest, ChatResponse
from app.core.auth       import get_current_user, rate_limiter
from app.services.llm_service import get_llm, convert_messages
from datetime             import datetime
import uuid

router = APIRouter()

@router.post("/completions", response_model=ChatResponse)
async def chat_completions(
    request:      ChatRequest,
    current_user: dict = Depends(get_current_user),
):
    # تېزلىك چەك تەكشۈرۈش
    await rate_limiter(current_user["user_id"])

    try:
        llm      = get_llm(request.model, request.temperature)
        messages = convert_messages(request.messages)
        response = await llm.ainvoke(messages)

        return ChatResponse(
            id           = str(uuid.uuid4()),
            content      = response.content,
            model        = request.model.value,
            tokens_used  = response.usage_metadata.get("total_tokens", 0),
            created_at   = datetime.utcnow().isoformat(),
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
06
Streaming API — SSE
Server-Sent Events Streaming
تامغا-تامغا ئاقىملىق جاۋاپ — ChatGPT ئۇسلۇبى
PYTHONapp/api/v1/stream.py
from fastapi           import APIRouter, Depends
from fastapi.responses import StreamingResponse
from app.models.schemas        import ChatRequest
from app.core.auth            import get_current_user
from app.services.llm_service import stream_response
import json

router = APIRouter()

@router.post("/chat")
async def stream_chat(
    request:      ChatRequest,
    current_user: dict = Depends(get_current_user),
):
    async def generate():
        async for chunk in stream_response(
            request.messages,
            request.model,
            request.temperature
        ):
            if chunk:
                data = json.dumps({"content": chunk, "done": False})
                yield f"data: {data}\n\n"
        # تاماملاندى بەلگىسى
        yield f"data: {json.dumps({'content': '', 'done': True})}\n\n"

    return StreamingResponse(
        generate(),
        media_type = "text/event-stream",
        headers    = {
            "Cache-Control":  "no-cache",
            "Connection":     "keep-alive",
            "X-Accel-Buffering": "no",
        }
    )

# ── Frontend JavaScript مىسالى ────────────────────────────
"""
const response = await fetch('/api/v1/stream/chat', {
  method: 'POST',
  headers: {'Authorization': `Bearer ${token}`, 'Content-Type': 'application/json'},
  body: JSON.stringify({messages: [{role: 'user', content: 'سالام!'}]})
});

const reader = response.body.getReader();
while (true) {
  const {done, value} = await reader.read();
  if (done) break;
  const text = new TextDecoder().decode(value);
  const lines = text.split('\n\n');
  for (const line of lines) {
    if (line.startsWith('data: ')) {
      const data = JSON.parse(line.slice(6));
      if (!data.done) console.log(data.content);
    }
  }
}
"""
07
RAG API نۇقتىسى
RAG Endpoint — Document Upload & Query
ھۆججەت يوللاش، Vector DB، سوئال-جاۋاپ نۇقتىلىرى
PYTHONapp/api/v1/rag.py
from fastapi  import APIRouter, UploadFile, File, Depends, HTTPException
from langchain_community.document_loaders import PyPDFLoader, TextLoader
from langchain.text_splitter             import RecursiveCharacterTextSplitter
from langchain_openai                    import OpenAIEmbeddings
from langchain_chroma                    import Chroma
from langchain.chains                    import create_retrieval_chain
from langchain.chains.combine_documents  import create_stuff_documents_chain
from langchain_core.prompts              import ChatPromptTemplate
from app.models.schemas  import RAGRequest, RAGResponse
from app.services.llm_service import get_llm
from app.core.auth        import get_current_user
import tempfile, os

router     = APIRouter()
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

# ── ھۆججەت يوللاش ────────────────────────────────────────
@router.post("/upload")
async def upload_document(
    file:         UploadFile = File(...),
    collection:   str        = "default",
    current_user: dict       = Depends(get_current_user),
):
    # ھۆججەت تۈرى تەكشۈرۈش
    if not file.filename.endswith((".pdf", ".txt", ".md")):
        raise HTTPException(status_code=400, detail="PDF، TXT، MD قوبۇل قىلىنىدۇ")

    # ۋاقىتلىق ھۆججەت ساقلاش
    with tempfile.NamedTemporaryFile(delete=False, suffix=file.filename) as tmp:
        tmp.write(await file.read())
        tmp_path = tmp.name

    try:
        # يوللاش ۋە بۆلۈش
        loader   = PyPDFLoader(tmp_path) if file.filename.endswith(".pdf") else TextLoader(tmp_path)
        docs     = loader.load()
        splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=64)
        chunks   = splitter.split_documents(docs)

        # Vector DB غا قوشۇش
        db = Chroma(collection_name=collection, embedding_function=embeddings, persist_directory="./chroma")
        db.add_documents(chunks)

        return {"message": f"{len(chunks)} بۆلۈك قوشۇلدى", "collection": collection}
    finally:
        os.unlink(tmp_path)

# ── RAG سوئال ────────────────────────────────────────────
@router.post("/query", response_model=RAGResponse)
async def rag_query(
    request:      RAGRequest,
    current_user: dict = Depends(get_current_user),
):
    db        = Chroma(collection_name=request.collection, embedding_function=embeddings, persist_directory="./chroma")
    retriever = db.as_retriever(search_kwargs={"k": request.top_k})
    llm       = get_llm(request.model)

    qa_prompt = ChatPromptTemplate.from_messages([
        ("system", "پەقەت بىرىلگەن مەزمۇنغا ئاساسلانغان جاۋاپ بەر:\n{context}"),
        ("human",  "{input}"),
    ])

    chain  = create_retrieval_chain(retriever, create_stuff_documents_chain(llm, qa_prompt))
    result = await chain.ainvoke({"input": request.query})

    sources = []
    if request.with_sources:
        sources = [{
            "content": d.page_content[:200],
            "source":  d.metadata.get("source", "نامەلۇم"),
        } for d in result["context"]]

    return RAGResponse(answer=result["answer"], sources=sources, tokens=0)
08
Agent API نۇقتىسى
Agent Endpoint — Tool-calling Agent
قورال ئىشلىتىدىغان Agent — تور ئىزدەش، ھىسابلاش، ھۆججەت
PYTHONapp/api/v1/agent.py
from fastapi           import APIRouter, Depends
from pydantic          import BaseModel
from langchain.agents  import AgentExecutor, create_tool_calling_agent
from langchain.tools   import tool
from langchain_community.tools import DuckDuckGoSearchRun
from langchain_core.prompts    import ChatPromptTemplate, MessagesPlaceholder
from app.services.llm_service  import get_llm
from app.models.schemas        import ModelChoice
from app.core.auth             import get_current_user

router = APIRouter()

class AgentRequest(BaseModel):
    task:  str
    model: ModelChoice = ModelChoice.GPT4O_MINI

# ── قوراللار تارىفى ──────────────────────────────────────
@tool
def calculate(expression: str) -> str:
    """ماتېماتىكىلىق ئىپادىنى ھىسابلا. مىسال: 2+2, sqrt(16)"""
    import math
    try:
        return str(eval(expression, {"__builtins__": {}}, vars(math)))
    except Exception as e:
        return f"خاتالىق: {e}"

tools = [DuckDuckGoSearchRun(), calculate]

@router.post("/run")
async def run_agent(
    request:      AgentRequest,
    current_user: dict = Depends(get_current_user),
):
    llm    = get_llm(request.model)
    prompt = ChatPromptTemplate.from_messages([
        ("system", "سەن ئۇيغۇرچە ياردەمچىسەن. قورالدىن ئىشلىت."),
        ("human",  "{input}"),
        MessagesPlaceholder("agent_scratchpad"),
    ])

    agent    = create_tool_calling_agent(llm, tools, prompt)
    executor = AgentExecutor(agent=agent, tools=tools, max_iterations=5)

    result = await executor.ainvoke({"input": request.task})
    return {"output": result["output"]}
09
WebSocket — يىلتىز ئىككى تەرەپلىك
WebSocket — Real-time Bidirectional Chat
WebSocket ئارقىلىق يىلتىز سۆھبەت — چات ئاپپ ئۈچۈن
PYTHONapp/api/v1/websocket.py
from fastapi           import APIRouter, WebSocket, WebSocketDisconnect
from langchain_openai  import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
import json

router = APIRouter()

# بارلىق باغلانغان ئىشلەتكۈچىلەرنى ساقلاش
connections: dict[str, WebSocket] = {}

@router.websocket("/ws/{session_id}")
async def websocket_chat(websocket: WebSocket, session_id: str):
    await websocket.accept()
    connections[session_id] = websocket

    llm      = ChatOpenAI(model="gpt-4o-mini", streaming=True)
    history  = [SystemMessage(content="سەن ئۇيغۇرچە ياردەمچىسەن.")]

    try:
        while True:
            # ئىشلەتكۈچىدىن سوئال قوبۇل قىلىش
            data    = await websocket.receive_text()
            payload = json.loads(data)
            history.append(HumanMessage(content=payload["message"]))

            # ئاقىملىق جاۋاپ يوللاش
            full_response = ""
            async for chunk in llm.astream(history):
                if chunk.content:
                    full_response += chunk.content
                    await websocket.send_json({
                        "type":    "chunk",
                        "content": chunk.content
                    })

            # تاماملاندى
            await websocket.send_json({"type": "done", "content": ""})
            history.append(AIMessage(content=full_response))

    except WebSocketDisconnect:
        connections.pop(session_id, None)
10
Docker ۋە يايدۇرۇش
Docker, Docker Compose, Deployment
Production دەرىجىدە يايدۇرۇش — Docker، Nginx، HTTPS
DOCKERFILEDockerfile
FROM python:3.12-slim

WORKDIR /app

# تەلەپلەر
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# كود
COPY . .

# Healthcheck
HEALTHCHECK --interval=30s --timeout=10s \
    CMD curl -f http://localhost:8000/health || exit 1

# ئىجرا
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
YAMLdocker-compose.yml
version: '3.9'

services:
  api:
    build: .
    ports: ["8000:8000"]
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
      - SECRET_KEY=${SECRET_KEY}
      - REDIS_URL=redis://redis:6379
    depends_on: [redis]
    volumes:
      - ./chroma_db:/app/chroma_db
    restart: unless-stopped

  redis:
    image: redis:7-alpine
    volumes: [redis_data:/data]
    restart: unless-stopped

  nginx:
    image: nginx:alpine
    ports: ["80:80", "443:443"]
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
      - ./certs:/etc/ssl/certs
    depends_on: [api]

volumes: {redis_data:}

تولۇق API نۇقتىلىرى خۇلاسىسى

نۇقتائۇسۇلئۇيغۇرچەAuth
/api/v1/auth/tokenPOSTJWT token ئالىشئاچىق
/api/v1/chat/completionsPOSTLLM جاۋاپ ئالىشJWT
/api/v1/stream/chatPOSTئاقىملىق SSE جاۋاپJWT
/api/v1/rag/uploadPOSTھۆججەت يوللاشJWT
/api/v1/rag/queryPOSTRAG سوئال-جاۋاپJWT
/api/v1/agent/runPOSTAgent ۋەزىپىسىJWT
/ws/{session_id}WSيىلتىز سۆھبەتJWT
/healthGETسىستېما ساغلاملىقىئاچىق
/docsGETSwagger UIئاچىق