← 返回首页
分布式

微服务架构

📂 python ⏱ 6 min 1150 words

微服务架构

微服务架构将应用拆分为小型、独立的服务。本文将介绍Python微服务的设计原则、API网关、服务发现和gRPC通信。

服务拆分原则

# 单体应用 vs 微服务
# 单体:所有功能在一个进程中
# 微服务:每个功能是独立服务

# 服务拆分示例
# 用户服务
class UserService:
    def __init__(self):
        self.db = UserDatabase()
    
    def create_user(self, user_data):
        return self.db.create(user_data)
    
    def get_user(self, user_id):
        return self.db.get(user_id)

# 订单服务
class OrderService:
    def __init__(self):
        self.db = OrderDatabase()
        self.user_client = UserServiceClient()
    
    def create_order(self, order_data):
        # 调用用户服务验证用户
        user = self.user_client.get_user(order_data["user_id"])
        if not user:
            raise ValueError("用户不存在")
        
        return self.db.create(order_data)

# 服务边界划分
# 1. 单一职责:每个服务只做一件事
# 2. 数据库独立:每个服务有自己的数据库
# 3. 可独立部署:服务可以独立升级和扩展

FastAPI微服务

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
import uvicorn
import httpx

app = FastAPI(title="用户服务")

# 数据模型
class UserCreate(BaseModel):
    name: str
    email: str
    password: str

class UserResponse(BaseModel):
    id: int
    name: str
    email: str

class UserUpdate(BaseModel):
    name: Optional[str] = None
    email: Optional[str] = None

# 模拟数据库
users_db = {}
user_id_counter = 1

@app.post("/users", response_model=UserResponse, status_code=201)
async def create_user(user: UserCreate):
    global user_id_counter
    
    # 检查邮箱是否已存在
    for user_data in users_db.values():
        if user_data["email"] == user.email:
            raise HTTPException(status_code=400, detail="邮箱已存在")
    
    user_data = user.dict()
    user_data["id"] = user_id_counter
    users_db[user_id_counter] = user_data
    user_id_counter += 1
    
    return user_data

@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
    if user_id not in users_db:
        raise HTTPException(status_code=404, detail="用户不存在")
    return users_db[user_id]

@app.get("/users", response_model=list[UserResponse])
async def list_users(skip: int = 0, limit: int = 100):
    users = list(users_db.values())
    return users[skip:skip + limit]

@app.put("/users/{user_id}", response_model=UserResponse)
async def update_user(user_id: int, user_update: UserUpdate):
    if user_id not in users_db:
        raise HTTPException(status_code=404, detail="用户不存在")
    
    user_data = users_db[user_id]
    update_data = user_update.dict(exclude_unset=True)
    user_data.update(update_data)
    
    return user_data

@app.delete("/users/{user_id}", status_code=204)
async def delete_user(user_id: int):
    if user_id not in users_db:
        raise HTTPException(status_code=404, detail="用户不存在")
    del users_db[user_id]

# 健康检查
@app.get("/health")
async def health_check():
    return {"status": "healthy"}

# 启动服务
if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8001)

API网关

from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
import httpx
from typing import Dict
import time

app = FastAPI(title="API网关")

# 服务注册表
services: Dict[str, str] = {
    "users": "http://user-service:8001",
    "orders": "http://order-service:8002",
    "products": "http://product-service:8003"
}

# 路由映射
routes = {
    "/api/users": "users",
    "/api/orders": "orders",
    "/api/products": "products"
}

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

@app.middleware("http")
async def gateway_middleware(request: Request, call_next):
    """API网关中间件"""
    start_time = time.time()
    
    # 路径匹配
    path = request.url.path
    service_name = None
    
    for route, service in routes.items():
        if path.startswith(route):
            service_name = service
            break
    
    if not service_name:
        return await call_next(request)
    
    # 转发请求
    service_url = services[service_name]
    
    async with httpx.AsyncClient() as client:
        try:
            response = await client.request(
                method=request.method,
                url=f"{service_url}{path}",
                headers=dict(request.headers),
                content=await request.body(),
                timeout=5.0
            )
            
            # 添加响应头
            process_time = time.time() - start_time
            headers = {
                "X-Process-Time": str(process_time),
                "X-Service": service_name
            }
            
            return Response(
                content=response.content,
                status_code=response.status_code,
                headers=headers
            )
            
        except httpx.RequestError:
            raise HTTPException(status_code=503, detail="服务不可用")

# 服务健康检查
@app.get("/gateway/health")
async def gateway_health():
    health_status = {}
    
    async with httpx.AsyncClient() as client:
        for service_name, service_url in services.items():
            try:
                response = await client.get(
                    f"{service_url}/health",
                    timeout=2.0
                )
                health_status[service_name] = response.status_code == 200
            except Exception:
                health_status[service_name] = False
    
    return {"services": health_status}

# 服务注册
@app.post("/gateway/register")
async def register_service(name: str, url: str):
    services[name] = url
    return {"message": f"服务 {name} 已注册"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

服务发现

import redis
import json
import time
from typing import Dict, List
import threading

class ServiceDiscovery:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.Redis.from_url(redis_url)
        self.services: Dict[str, List[str]] = {}
        self._running = False
        self._thread = None
    
    def register(self, service_name: str, service_url: str, ttl: int = 30):
        """注册服务"""
        key = f"service:{service_name}"
        self.redis.hset(key, service_url, json.dumps({
            "url": service_url,
            "registered_at": time.time(),
            "ttl": ttl
        }))
        self.redis.expire(key, ttl)
    
    def deregister(self, service_name: str, service_url: str):
        """注销服务"""
        key = f"service:{service_name}"
        self.redis.hdel(key, service_url)
    
    def discover(self, service_name: str) -> List[str]:
        """发现服务"""
        key = f"service:{service_name}"
        services = self.redis.hgetall(key)
        
        result = []
        for service_url, data in services.items():
            service_data = json.loads(data)
            # 检查TTL
            if time.time() - service_data["registered_at"] < service_data["ttl"]:
                result.append(service_url.decode())
        
        return result
    
    def heartbeat(self, service_name: str, service_url: str, ttl: int = 30):
        """心跳续约"""
        key = f"service:{service_name}"
        data = self.redis.hget(key, service_url)
        if data:
            service_data = json.loads(data)
            service_data["registered_at"] = time.time()
            self.redis.hset(key, service_url, json.dumps(service_data))
            self.redis.expire(key, ttl)
    
    def start_heartbeat(self, service_name: str, service_url: str, interval: int = 10):
        """启动心跳线程"""
        def heartbeat_loop():
            while self._running:
                self.heartbeat(service_name, service_url)
                time.sleep(interval)
        
        self._running = True
        self._thread = threading.Thread(target=heartbeat_loop, daemon=True)
        self._thread.start()
    
    def stop_heartbeat(self):
        """停止心跳"""
        self._running = False
        if self._thread:
            self._thread.join(timeout=5)

# 使用服务发现
class ServiceClient:
    def __init__(self, discovery: ServiceDiscovery):
        self.discovery = discovery
    
    async def call_service(self, service_name: str, method: str, path: str, **kwargs):
        """调用服务"""
        urls = self.discovery.discover(service_name)
        if not urls:
            raise Exception(f"服务 {service_name} 不可用")
        
        # 简单负载均衡:随机选择
        import random
        url = random.choice(urls)
        
        async with httpx.AsyncClient() as client:
            response = await client.request(
                method=method,
                url=f"{url}{path}",
                **kwargs
            )
            return response.json()

gRPC服务

// user.proto
syntax = "proto3";

package users;

service UserService {
    rpc GetUser (GetUserRequest) returns (UserResponse);
    rpc CreateUser (CreateUserRequest) returns (UserResponse);
    rpc ListUsers (ListUsersRequest) returns (stream UserResponse);
}

message GetUserRequest {
    int32 id = 1;
}

message CreateUserRequest {
    string name = 1;
    string email = 2;
    string password = 3;
}

message UserResponse {
    int32 id = 1;
    string name = 2;
    string email = 3;
}

message ListUsersRequest {
    int32 page = 1;
    int32 page_size = 2;
}
# gRPC服务实现
import grpc
from concurrent import futures
import users_pb2
import users_pb2_grpc

class UserServicer(users_pb2_grpc.UserServiceServicer):
    def __init__(self):
        self.users = {}
        self.counter = 1
    
    def GetUser(self, request, context):
        if request.id not in self.users:
            context.abort(grpc.StatusCode.NOT_FOUND, "用户不存在")
        
        user = self.users[request.id]
        return users_pb2.UserResponse(
            id=user["id"],
            name=user["name"],
            email=user["email"]
        )
    
    def CreateUser(self, request, context):
        user_data = {
            "id": self.counter,
            "name": request.name,
            "email": request.email
        }
        self.users[self.counter] = user_data
        self.counter += 1
        
        return users_pb2.UserResponse(
            id=user_data["id"],
            name=user_data["name"],
            email=user_data["email"]
        )
    
    def ListUsers(self, request, context):
        users = list(self.users.values())
        start = (request.page - 1) * request.page_size
        end = start + request.page_size
        
        for user in users[start:end]:
            yield users_pb2.UserResponse(
                id=user["id"],
                name=user["name"],
                email=user["email"]
            )

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    users_pb2_grpc.add_UserServiceServicer_to_server(
        UserServicer(), server
    )
    
    server.add_insecure_port("[::]:50051")
    server.start()
    server.wait_for_termination()

if __name__ == "__main__":
    serve()

消息队列集成

import pika
import json
import threading
from typing import Callable

class MessageQueue:
    def __init__(self, host: str = "localhost"):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host)
        )
        self.channel = self.connection.channel()
        self._consumers = {}
    
    def declare_queue(self, queue_name: str):
        self.channel.queue_declare(queue=queue_name, durable=True)
    
    def publish(self, queue_name: str, message: dict):
        self.channel.basic_publish(
            exchange="",
            routing_key=queue_name,
            body=json.dumps(message),
            properties=pika.BasicProperties(
                delivery_mode=2,  # 持久化
            )
        )
    
    def consume(self, queue_name: str, callback: Callable):
        def wrapper(ch, method, properties, body):
            message = json.loads(body)
            callback(message)
            ch.basic_ack(delivery_tag=method.delivery_tag)
        
        self.channel.basic_consume(
            queue=queue_name,
            on_message_callback=wrapper
        )
        
        self.channel.start_consuming()
    
    def start_consuming_async(self, queue_name: str, callback: Callable):
        """异步消费"""
        def consume_in_thread():
            self.consume(queue_name, callback)
        
        thread = threading.Thread(target=consume_in_thread, daemon=True)
        thread.start()
        return thread

# 使用消息队列
class OrderEventHandler:
    def __init__(self):
        self.mq = MessageQueue()
        self.mq.declare_queue("order_created")
        self.mq.declare_queue("order_completed")
    
    def handle_order_created(self, message):
        """处理订单创建事件"""
        order_id = message["order_id"]
        user_id = message["user_id"]
        
        # 发送确认邮件
        print(f"发送订单确认邮件: 订单{order_id}")
        
        # 通知库存服务
        self.mq.publish("inventory_update", {
            "order_id": order_id,
            "items": message["items"]
        })
    
    def start(self):
        """启动事件处理"""
        self.mq.start_consuming_async(
            "order_created", 
            self.handle_order_created
        )

容器化部署

# docker-compose.yml
version: '3.8'

services:
  api-gateway:
    build: ./api-gateway
    ports:
      - "8000:8000"
    depends_on:
      - user-service
      - order-service
      - product-service
    environment:
      - REDIS_URL=redis://redis:6379

  user-service:
    build: ./user-service
    ports:
      - "8001:8001"
    environment:
      - DATABASE_URL=postgresql://user:pass@user-db:5432/users
    depends_on:
      - user-db

  order-service:
    build: ./order-service
    ports:
      - "8002:8002"
    environment:
      - DATABASE_URL=postgresql://user:pass@order-db:5432/orders
      - USER_SERVICE_URL=http://user-service:8001
    depends_on:
      - order-db
      - user-service

  product-service:
    build: ./product-service
    ports:
      - "8003:8003"
    environment:
      - DATABASE_URL=postgresql://user:pass@product-db:5432/products
    depends_on:
      - product-db

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

  user-db:
    image: postgres:15
    environment:
      POSTGRES_DB: users
      POSTGRES_USER: user
      POSTGRES_PASSWORD: pass

  order-db:
    image: postgres:15
    environment:
      POSTGRES_DB: orders
      POSTGRES_USER: user
      POSTGRES_PASSWORD: pass

  product-db:
    image: postgres:15
    environment:
      POSTGRES_DB: products
      POSTGRES_USER: user
      POSTGRES_PASSWORD: pass

监控和日志

import logging
import json
from datetime import datetime
from typing import Any

class StructuredLogger:
    def __init__(self, service_name: str):
        self.service_name = service_name
        self.logger = logging.getLogger(service_name)
        
        # JSON格式化
        handler = logging.StreamHandler()
        formatter = logging.Formatter(
            '{"time":"%(asctime)s","service":"%(name)s",'
            '"level":"%(levelname)s","message":"%(message)s"}'
        )
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
    
    def log(self, level: str, message: str, **kwargs):
        log_data = {
            "service": self.service_name,
            "message": message,
            "timestamp": datetime.utcnow().isoformat(),
            **kwargs
        }
        
        getattr(self.logger, level.lower())(json.dumps(log_data))

# 使用
logger = StructuredLogger("user-service")

def create_user(user_data):
    logger.log("INFO", "创建用户", user_id=user_data.get("id"))
    
    try:
        # 业务逻辑
        user = save_user(user_data)
        logger.log("INFO", "用户创建成功", user_id=user.id)
        return user
    except Exception as e:
        logger.log("ERROR", "用户创建失败", error=str(e))
        raise

最佳实践

  1. 服务自治:每个服务独立开发、部署、扩展
  2. API优先:定义清晰的API契约
  3. 容错设计:实现熔断、重试、降级
  4. 分布式追踪:使用Jaeger或Zipkin跟踪请求
  5. 配置管理:使用配置中心管理服务配置

总结

微服务架构提供了灵活的扩展和部署能力。通过合理拆分服务、实现API网关、服务发现和消息队列,可以构建可扩展、可维护的分布式系统。