微服务架构
微服务架构
微服务架构将应用拆分为小型、独立的服务。本文将介绍Python微服务的设计原则、API网关、服务发现和gRPC通信。
服务拆分原则
# 单体应用 vs 微服务
# 单体:所有功能在一个进程中
# 微服务:每个功能是独立服务
# 服务拆分示例
# 用户服务
class UserService:
def __init__(self):
self.db = UserDatabase()
def create_user(self, user_data):
return self.db.create(user_data)
def get_user(self, user_id):
return self.db.get(user_id)
# 订单服务
class OrderService:
def __init__(self):
self.db = OrderDatabase()
self.user_client = UserServiceClient()
def create_order(self, order_data):
# 调用用户服务验证用户
user = self.user_client.get_user(order_data["user_id"])
if not user:
raise ValueError("用户不存在")
return self.db.create(order_data)
# 服务边界划分
# 1. 单一职责:每个服务只做一件事
# 2. 数据库独立:每个服务有自己的数据库
# 3. 可独立部署:服务可以独立升级和扩展
FastAPI微服务
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
import uvicorn
import httpx
app = FastAPI(title="用户服务")
# 数据模型
class UserCreate(BaseModel):
name: str
email: str
password: str
class UserResponse(BaseModel):
id: int
name: str
email: str
class UserUpdate(BaseModel):
name: Optional[str] = None
email: Optional[str] = None
# 模拟数据库
users_db = {}
user_id_counter = 1
@app.post("/users", response_model=UserResponse, status_code=201)
async def create_user(user: UserCreate):
global user_id_counter
# 检查邮箱是否已存在
for user_data in users_db.values():
if user_data["email"] == user.email:
raise HTTPException(status_code=400, detail="邮箱已存在")
user_data = user.dict()
user_data["id"] = user_id_counter
users_db[user_id_counter] = user_data
user_id_counter += 1
return user_data
@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
if user_id not in users_db:
raise HTTPException(status_code=404, detail="用户不存在")
return users_db[user_id]
@app.get("/users", response_model=list[UserResponse])
async def list_users(skip: int = 0, limit: int = 100):
users = list(users_db.values())
return users[skip:skip + limit]
@app.put("/users/{user_id}", response_model=UserResponse)
async def update_user(user_id: int, user_update: UserUpdate):
if user_id not in users_db:
raise HTTPException(status_code=404, detail="用户不存在")
user_data = users_db[user_id]
update_data = user_update.dict(exclude_unset=True)
user_data.update(update_data)
return user_data
@app.delete("/users/{user_id}", status_code=204)
async def delete_user(user_id: int):
if user_id not in users_db:
raise HTTPException(status_code=404, detail="用户不存在")
del users_db[user_id]
# 健康检查
@app.get("/health")
async def health_check():
return {"status": "healthy"}
# 启动服务
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8001)
API网关
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
import httpx
from typing import Dict
import time
app = FastAPI(title="API网关")
# 服务注册表
services: Dict[str, str] = {
"users": "http://user-service:8001",
"orders": "http://order-service:8002",
"products": "http://product-service:8003"
}
# 路由映射
routes = {
"/api/users": "users",
"/api/orders": "orders",
"/api/products": "products"
}
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.middleware("http")
async def gateway_middleware(request: Request, call_next):
"""API网关中间件"""
start_time = time.time()
# 路径匹配
path = request.url.path
service_name = None
for route, service in routes.items():
if path.startswith(route):
service_name = service
break
if not service_name:
return await call_next(request)
# 转发请求
service_url = services[service_name]
async with httpx.AsyncClient() as client:
try:
response = await client.request(
method=request.method,
url=f"{service_url}{path}",
headers=dict(request.headers),
content=await request.body(),
timeout=5.0
)
# 添加响应头
process_time = time.time() - start_time
headers = {
"X-Process-Time": str(process_time),
"X-Service": service_name
}
return Response(
content=response.content,
status_code=response.status_code,
headers=headers
)
except httpx.RequestError:
raise HTTPException(status_code=503, detail="服务不可用")
# 服务健康检查
@app.get("/gateway/health")
async def gateway_health():
health_status = {}
async with httpx.AsyncClient() as client:
for service_name, service_url in services.items():
try:
response = await client.get(
f"{service_url}/health",
timeout=2.0
)
health_status[service_name] = response.status_code == 200
except Exception:
health_status[service_name] = False
return {"services": health_status}
# 服务注册
@app.post("/gateway/register")
async def register_service(name: str, url: str):
services[name] = url
return {"message": f"服务 {name} 已注册"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
服务发现
import redis
import json
import time
from typing import Dict, List
import threading
class ServiceDiscovery:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis = redis.Redis.from_url(redis_url)
self.services: Dict[str, List[str]] = {}
self._running = False
self._thread = None
def register(self, service_name: str, service_url: str, ttl: int = 30):
"""注册服务"""
key = f"service:{service_name}"
self.redis.hset(key, service_url, json.dumps({
"url": service_url,
"registered_at": time.time(),
"ttl": ttl
}))
self.redis.expire(key, ttl)
def deregister(self, service_name: str, service_url: str):
"""注销服务"""
key = f"service:{service_name}"
self.redis.hdel(key, service_url)
def discover(self, service_name: str) -> List[str]:
"""发现服务"""
key = f"service:{service_name}"
services = self.redis.hgetall(key)
result = []
for service_url, data in services.items():
service_data = json.loads(data)
# 检查TTL
if time.time() - service_data["registered_at"] < service_data["ttl"]:
result.append(service_url.decode())
return result
def heartbeat(self, service_name: str, service_url: str, ttl: int = 30):
"""心跳续约"""
key = f"service:{service_name}"
data = self.redis.hget(key, service_url)
if data:
service_data = json.loads(data)
service_data["registered_at"] = time.time()
self.redis.hset(key, service_url, json.dumps(service_data))
self.redis.expire(key, ttl)
def start_heartbeat(self, service_name: str, service_url: str, interval: int = 10):
"""启动心跳线程"""
def heartbeat_loop():
while self._running:
self.heartbeat(service_name, service_url)
time.sleep(interval)
self._running = True
self._thread = threading.Thread(target=heartbeat_loop, daemon=True)
self._thread.start()
def stop_heartbeat(self):
"""停止心跳"""
self._running = False
if self._thread:
self._thread.join(timeout=5)
# 使用服务发现
class ServiceClient:
def __init__(self, discovery: ServiceDiscovery):
self.discovery = discovery
async def call_service(self, service_name: str, method: str, path: str, **kwargs):
"""调用服务"""
urls = self.discovery.discover(service_name)
if not urls:
raise Exception(f"服务 {service_name} 不可用")
# 简单负载均衡:随机选择
import random
url = random.choice(urls)
async with httpx.AsyncClient() as client:
response = await client.request(
method=method,
url=f"{url}{path}",
**kwargs
)
return response.json()
gRPC服务
// user.proto
syntax = "proto3";
package users;
service UserService {
rpc GetUser (GetUserRequest) returns (UserResponse);
rpc CreateUser (CreateUserRequest) returns (UserResponse);
rpc ListUsers (ListUsersRequest) returns (stream UserResponse);
}
message GetUserRequest {
int32 id = 1;
}
message CreateUserRequest {
string name = 1;
string email = 2;
string password = 3;
}
message UserResponse {
int32 id = 1;
string name = 2;
string email = 3;
}
message ListUsersRequest {
int32 page = 1;
int32 page_size = 2;
}
# gRPC服务实现
import grpc
from concurrent import futures
import users_pb2
import users_pb2_grpc
class UserServicer(users_pb2_grpc.UserServiceServicer):
def __init__(self):
self.users = {}
self.counter = 1
def GetUser(self, request, context):
if request.id not in self.users:
context.abort(grpc.StatusCode.NOT_FOUND, "用户不存在")
user = self.users[request.id]
return users_pb2.UserResponse(
id=user["id"],
name=user["name"],
email=user["email"]
)
def CreateUser(self, request, context):
user_data = {
"id": self.counter,
"name": request.name,
"email": request.email
}
self.users[self.counter] = user_data
self.counter += 1
return users_pb2.UserResponse(
id=user_data["id"],
name=user_data["name"],
email=user_data["email"]
)
def ListUsers(self, request, context):
users = list(self.users.values())
start = (request.page - 1) * request.page_size
end = start + request.page_size
for user in users[start:end]:
yield users_pb2.UserResponse(
id=user["id"],
name=user["name"],
email=user["email"]
)
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
users_pb2_grpc.add_UserServiceServicer_to_server(
UserServicer(), server
)
server.add_insecure_port("[::]:50051")
server.start()
server.wait_for_termination()
if __name__ == "__main__":
serve()
消息队列集成
import pika
import json
import threading
from typing import Callable
class MessageQueue:
def __init__(self, host: str = "localhost"):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host)
)
self.channel = self.connection.channel()
self._consumers = {}
def declare_queue(self, queue_name: str):
self.channel.queue_declare(queue=queue_name, durable=True)
def publish(self, queue_name: str, message: dict):
self.channel.basic_publish(
exchange="",
routing_key=queue_name,
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # 持久化
)
)
def consume(self, queue_name: str, callback: Callable):
def wrapper(ch, method, properties, body):
message = json.loads(body)
callback(message)
ch.basic_ack(delivery_tag=method.delivery_tag)
self.channel.basic_consume(
queue=queue_name,
on_message_callback=wrapper
)
self.channel.start_consuming()
def start_consuming_async(self, queue_name: str, callback: Callable):
"""异步消费"""
def consume_in_thread():
self.consume(queue_name, callback)
thread = threading.Thread(target=consume_in_thread, daemon=True)
thread.start()
return thread
# 使用消息队列
class OrderEventHandler:
def __init__(self):
self.mq = MessageQueue()
self.mq.declare_queue("order_created")
self.mq.declare_queue("order_completed")
def handle_order_created(self, message):
"""处理订单创建事件"""
order_id = message["order_id"]
user_id = message["user_id"]
# 发送确认邮件
print(f"发送订单确认邮件: 订单{order_id}")
# 通知库存服务
self.mq.publish("inventory_update", {
"order_id": order_id,
"items": message["items"]
})
def start(self):
"""启动事件处理"""
self.mq.start_consuming_async(
"order_created",
self.handle_order_created
)
容器化部署
# docker-compose.yml
version: '3.8'
services:
api-gateway:
build: ./api-gateway
ports:
- "8000:8000"
depends_on:
- user-service
- order-service
- product-service
environment:
- REDIS_URL=redis://redis:6379
user-service:
build: ./user-service
ports:
- "8001:8001"
environment:
- DATABASE_URL=postgresql://user:pass@user-db:5432/users
depends_on:
- user-db
order-service:
build: ./order-service
ports:
- "8002:8002"
environment:
- DATABASE_URL=postgresql://user:pass@order-db:5432/orders
- USER_SERVICE_URL=http://user-service:8001
depends_on:
- order-db
- user-service
product-service:
build: ./product-service
ports:
- "8003:8003"
environment:
- DATABASE_URL=postgresql://user:pass@product-db:5432/products
depends_on:
- product-db
redis:
image: redis:7-alpine
ports:
- "6379:6379"
user-db:
image: postgres:15
environment:
POSTGRES_DB: users
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
order-db:
image: postgres:15
environment:
POSTGRES_DB: orders
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
product-db:
image: postgres:15
environment:
POSTGRES_DB: products
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
监控和日志
import logging
import json
from datetime import datetime
from typing import Any
class StructuredLogger:
def __init__(self, service_name: str):
self.service_name = service_name
self.logger = logging.getLogger(service_name)
# JSON格式化
handler = logging.StreamHandler()
formatter = logging.Formatter(
'{"time":"%(asctime)s","service":"%(name)s",'
'"level":"%(levelname)s","message":"%(message)s"}'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def log(self, level: str, message: str, **kwargs):
log_data = {
"service": self.service_name,
"message": message,
"timestamp": datetime.utcnow().isoformat(),
**kwargs
}
getattr(self.logger, level.lower())(json.dumps(log_data))
# 使用
logger = StructuredLogger("user-service")
def create_user(user_data):
logger.log("INFO", "创建用户", user_id=user_data.get("id"))
try:
# 业务逻辑
user = save_user(user_data)
logger.log("INFO", "用户创建成功", user_id=user.id)
return user
except Exception as e:
logger.log("ERROR", "用户创建失败", error=str(e))
raise
最佳实践
- 服务自治:每个服务独立开发、部署、扩展
- API优先:定义清晰的API契约
- 容错设计:实现熔断、重试、降级
- 分布式追踪:使用Jaeger或Zipkin跟踪请求
- 配置管理:使用配置中心管理服务配置
总结
微服务架构提供了灵活的扩展和部署能力。通过合理拆分服务、实现API网关、服务发现和消息队列,可以构建可扩展、可维护的分布式系统。