异步编程模式
异步编程模式
异步编程是现代Python高性能应用的关键。本文将介绍几种重要的异步编程模式和最佳实践。
异步上下文管理器
import asyncio
import aiohttp
from contextlib import asynccontextmanager
# 基本异步上下文管理器
class AsyncDatabaseConnection:
def __init__(self, dsn):
self.dsn = dsn
self.connection = None
async def __aenter__(self):
print("建立数据库连接...")
# 模拟异步连接
await asyncio.sleep(0.1)
self.connection = {"dsn": self.dsn, "connected": True}
return self.connection
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("关闭数据库连接...")
await asyncio.sleep(0.1)
self.connection = None
return False # 不抑制异常
# 使用异步上下文管理器
async def use_database():
async with AsyncDatabaseConnection("postgresql://localhost/db") as conn:
print(f"使用连接: {conn}")
print("连接已关闭")
# 使用asynccontextmanager装饰器
@asynccontextmanager
async def async_timer(label):
start = asyncio.get_event_loop().time()
try:
yield
finally:
elapsed = asyncio.get_event_loop().time() - start
print(f"{label} 耗时: {elapsed:.3f}秒")
async def main():
async with async_timer("异步操作"):
await asyncio.sleep(0.5)
异步队列模式
import asyncio
import random
from typing import Any
class AsyncMessageQueue:
def __init__(self, maxsize=0):
self.queue = asyncio.Queue(maxsize=maxsize)
self._closed = False
async def put(self, item):
if self._closed:
raise asyncio.QueueShutDown
await self.queue.put(item)
async def get(self):
if self._closed and self.queue.empty():
raise asyncio.QueueShutDown
return await self.queue.get()
def task_done(self):
self.queue.task_done()
async def join(self):
await self.queue.join()
def close(self):
self._closed = True
# 生产者-消费者模式
async def producer(queue: AsyncMessageQueue, producer_id: int):
for i in range(5):
item = f"商品-{producer_id}-{i}"
await queue.put(item)
print(f"生产者{producer_id}生产: {item}")
await asyncio.sleep(random.uniform(0.1, 0.3))
async def consumer(queue: AsyncMessageQueue, consumer_id: int):
while True:
try:
item = await asyncio.wait_for(queue.get(), timeout=1.0)
print(f"消费者{consumer_id}处理: {item}")
await asyncio.sleep(random.uniform(0.2, 0.5)) # 模拟处理
queue.task_done()
except asyncio.TimeoutError:
break
async def producer_consumer_example():
queue = AsyncMessageQueue(maxsize=10)
producers = [asyncio.create_task(producer(queue, i)) for i in range(3)]
consumers = [asyncio.create_task(consumer(queue, i)) for i in range(2)]
await asyncio.gather(*producers)
await queue.join()
for c in consumers:
c.cancel()
await asyncio.gather(*consumers, return_exceptions=True)
Actor模型
import asyncio
from dataclasses import dataclass
from typing import Dict, Any, Callable
from collections import deque
@dataclass
class Message:
sender: str
receiver: str
content: Any
class Actor:
def __init__(self, name: str):
self.name = name
self.inbox: asyncio.Queue = asyncio.Queue()
self.peers: Dict[str, 'Actor'] = {}
self._running = False
def register_peer(self, peer: 'Actor'):
self.peers[peer.name] = peer
async def send(self, to: str, content: Any):
if to in self.peers:
msg = Message(sender=self.name, receiver=to, content=content)
await self.peers[to].inbox.put(msg)
async def receive(self, msg: Message):
raise NotImplementedError
async def run(self):
self._running = True
while self._running:
try:
msg = await asyncio.wait_for(self.inbox.get(), timeout=0.1)
await self.receive(msg)
except asyncio.TimeoutError:
continue
def stop(self):
self._running = False
# 具体Actor实现
class CounterActor(Actor):
def __init__(self, name: str):
super().__init__(name)
self.count = 0
async def receive(self, msg: Message):
if msg.content == "increment":
self.count += 1
print(f"{self.name}: 计数器={self.count}")
elif msg.content == "get_count":
await self.send(msg.sender, self.count)
elif isinstance(msg.content, dict) and "value" in msg.content:
self.count += msg.content["value"]
print(f"{self.name}: 增加{msg.content['value']},当前={self.count}")
class LoggerActor(Actor):
def __init__(self, name: str):
super().__init__(name)
self.logs: deque = deque(maxlen=100)
async def receive(self, msg: Message):
log_entry = f"[{msg.sender}] {msg.content}"
self.logs.append(log_entry)
print(f"日志: {log_entry}")
# 使用Actor系统
async def actor_example():
counter1 = CounterActor("counter1")
counter2 = CounterActor("counter2")
logger = LoggerActor("logger")
# 注册对等节点
for actor in [counter1, counter2, logger]:
for peer in [counter1, counter2, logger]:
if actor != peer:
actor.register_peer(peer)
# 启动actors
tasks = [
asyncio.create_task(counter1.run()),
asyncio.create_task(counter2.run()),
asyncio.create_task(logger.run())
]
# 交互
await counter1.send("counter1", "increment")
await counter1.send("counter2", {"value": 5})
await counter2.send("logger", "操作完成")
await asyncio.sleep(0.5)
# 清理
for actor in [counter1, counter2, logger]:
actor.stop()
await asyncio.gather(*tasks, return_exceptions=True)
异步迭代器模式
class AsyncDataStream:
def __init__(self, data: list):
self.data = data
self.index = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.index >= len(self.data):
raise StopAsyncIteration
await asyncio.sleep(0.1) # 模拟异步IO
item = self.data[self.index]
self.index += 1
return item
async def process_stream():
stream = AsyncDataStream([1, 2, 3, 4, 5])
async for item in stream:
print(f"处理: {item}")
异步生成器
async def async_range(start, stop, step=1):
current = start
while current < stop:
yield current
await asyncio.sleep(0.01) # 模拟异步操作
current += step
async def use_async_generator():
async for num in async_range(0, 10, 2):
print(f"数字: {num}")
# 异步生成器表达式
squares = [num async for num in async_range(0, 5) if num % 2 == 0]
print(f"偶数平方: {squares}")
信号量控制并发
async def bounded_task(semaphore, task_id):
async with semaphore:
print(f"任务{task_id}开始执行")
await asyncio.sleep(1)
print(f"任务{task_id}完成")
async def semaphore_example():
semaphore = asyncio.Semaphore(3) # 最多3个并发
tasks = [bounded_task(semaphore, i) for i in range(10)]
await asyncio.gather(*tasks)
最佳实践
- 避免阻塞操作:使用异步库或run_in_executor
- 合理使用队列:控制生产者-消费者速率
- 错误处理:使用try-except处理异步异常
- 资源清理:使用异步上下文管理器
- 监控性能:使用asyncio的调试模式
总结
异步编程模式为构建高性能Python应用提供了强大工具。掌握异步上下文管理器、队列、actor模型等模式,可以设计出可扩展、易维护的异步系统。