← 返回首页
异步

异步编程模式

📂 python ⏱ 4 min 609 words

异步编程模式

异步编程是现代Python高性能应用的关键。本文将介绍几种重要的异步编程模式和最佳实践。

异步上下文管理器

import asyncio
import aiohttp
from contextlib import asynccontextmanager

# 基本异步上下文管理器
class AsyncDatabaseConnection:
    def __init__(self, dsn):
        self.dsn = dsn
        self.connection = None
    
    async def __aenter__(self):
        print("建立数据库连接...")
        # 模拟异步连接
        await asyncio.sleep(0.1)
        self.connection = {"dsn": self.dsn, "connected": True}
        return self.connection
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("关闭数据库连接...")
        await asyncio.sleep(0.1)
        self.connection = None
        return False  # 不抑制异常

# 使用异步上下文管理器
async def use_database():
    async with AsyncDatabaseConnection("postgresql://localhost/db") as conn:
        print(f"使用连接: {conn}")
    
    print("连接已关闭")

# 使用asynccontextmanager装饰器
@asynccontextmanager
async def async_timer(label):
    start = asyncio.get_event_loop().time()
    try:
        yield
    finally:
        elapsed = asyncio.get_event_loop().time() - start
        print(f"{label} 耗时: {elapsed:.3f}秒")

async def main():
    async with async_timer("异步操作"):
        await asyncio.sleep(0.5)

异步队列模式

import asyncio
import random
from typing import Any

class AsyncMessageQueue:
    def __init__(self, maxsize=0):
        self.queue = asyncio.Queue(maxsize=maxsize)
        self._closed = False
    
    async def put(self, item):
        if self._closed:
            raise asyncio.QueueShutDown
        await self.queue.put(item)
    
    async def get(self):
        if self._closed and self.queue.empty():
            raise asyncio.QueueShutDown
        return await self.queue.get()
    
    def task_done(self):
        self.queue.task_done()
    
    async def join(self):
        await self.queue.join()
    
    def close(self):
        self._closed = True

# 生产者-消费者模式
async def producer(queue: AsyncMessageQueue, producer_id: int):
    for i in range(5):
        item = f"商品-{producer_id}-{i}"
        await queue.put(item)
        print(f"生产者{producer_id}生产: {item}")
        await asyncio.sleep(random.uniform(0.1, 0.3))

async def consumer(queue: AsyncMessageQueue, consumer_id: int):
    while True:
        try:
            item = await asyncio.wait_for(queue.get(), timeout=1.0)
            print(f"消费者{consumer_id}处理: {item}")
            await asyncio.sleep(random.uniform(0.2, 0.5))  # 模拟处理
            queue.task_done()
        except asyncio.TimeoutError:
            break

async def producer_consumer_example():
    queue = AsyncMessageQueue(maxsize=10)
    
    producers = [asyncio.create_task(producer(queue, i)) for i in range(3)]
    consumers = [asyncio.create_task(consumer(queue, i)) for i in range(2)]
    
    await asyncio.gather(*producers)
    await queue.join()
    
    for c in consumers:
        c.cancel()
    
    await asyncio.gather(*consumers, return_exceptions=True)

Actor模型

import asyncio
from dataclasses import dataclass
from typing import Dict, Any, Callable
from collections import deque

@dataclass
class Message:
    sender: str
    receiver: str
    content: Any

class Actor:
    def __init__(self, name: str):
        self.name = name
        self.inbox: asyncio.Queue = asyncio.Queue()
        self.peers: Dict[str, 'Actor'] = {}
        self._running = False
    
    def register_peer(self, peer: 'Actor'):
        self.peers[peer.name] = peer
    
    async def send(self, to: str, content: Any):
        if to in self.peers:
            msg = Message(sender=self.name, receiver=to, content=content)
            await self.peers[to].inbox.put(msg)
    
    async def receive(self, msg: Message):
        raise NotImplementedError
    
    async def run(self):
        self._running = True
        while self._running:
            try:
                msg = await asyncio.wait_for(self.inbox.get(), timeout=0.1)
                await self.receive(msg)
            except asyncio.TimeoutError:
                continue
    
    def stop(self):
        self._running = False

# 具体Actor实现
class CounterActor(Actor):
    def __init__(self, name: str):
        super().__init__(name)
        self.count = 0
    
    async def receive(self, msg: Message):
        if msg.content == "increment":
            self.count += 1
            print(f"{self.name}: 计数器={self.count}")
        elif msg.content == "get_count":
            await self.send(msg.sender, self.count)
        elif isinstance(msg.content, dict) and "value" in msg.content:
            self.count += msg.content["value"]
            print(f"{self.name}: 增加{msg.content['value']},当前={self.count}")

class LoggerActor(Actor):
    def __init__(self, name: str):
        super().__init__(name)
        self.logs: deque = deque(maxlen=100)
    
    async def receive(self, msg: Message):
        log_entry = f"[{msg.sender}] {msg.content}"
        self.logs.append(log_entry)
        print(f"日志: {log_entry}")

# 使用Actor系统
async def actor_example():
    counter1 = CounterActor("counter1")
    counter2 = CounterActor("counter2")
    logger = LoggerActor("logger")
    
    # 注册对等节点
    for actor in [counter1, counter2, logger]:
        for peer in [counter1, counter2, logger]:
            if actor != peer:
                actor.register_peer(peer)
    
    # 启动actors
    tasks = [
        asyncio.create_task(counter1.run()),
        asyncio.create_task(counter2.run()),
        asyncio.create_task(logger.run())
    ]
    
    # 交互
    await counter1.send("counter1", "increment")
    await counter1.send("counter2", {"value": 5})
    await counter2.send("logger", "操作完成")
    
    await asyncio.sleep(0.5)
    
    # 清理
    for actor in [counter1, counter2, logger]:
        actor.stop()
    
    await asyncio.gather(*tasks, return_exceptions=True)

异步迭代器模式

class AsyncDataStream:
    def __init__(self, data: list):
        self.data = data
        self.index = 0
    
    def __aiter__(self):
        return self
    
    async def __anext__(self):
        if self.index >= len(self.data):
            raise StopAsyncIteration
        
        await asyncio.sleep(0.1)  # 模拟异步IO
        item = self.data[self.index]
        self.index += 1
        return item

async def process_stream():
    stream = AsyncDataStream([1, 2, 3, 4, 5])
    async for item in stream:
        print(f"处理: {item}")

异步生成器

async def async_range(start, stop, step=1):
    current = start
    while current < stop:
        yield current
        await asyncio.sleep(0.01)  # 模拟异步操作
        current += step

async def use_async_generator():
    async for num in async_range(0, 10, 2):
        print(f"数字: {num}")
    
    # 异步生成器表达式
    squares = [num async for num in async_range(0, 5) if num % 2 == 0]
    print(f"偶数平方: {squares}")

信号量控制并发

async def bounded_task(semaphore, task_id):
    async with semaphore:
        print(f"任务{task_id}开始执行")
        await asyncio.sleep(1)
        print(f"任务{task_id}完成")

async def semaphore_example():
    semaphore = asyncio.Semaphore(3)  # 最多3个并发
    
    tasks = [bounded_task(semaphore, i) for i in range(10)]
    await asyncio.gather(*tasks)

最佳实践

  1. 避免阻塞操作:使用异步库或run_in_executor
  2. 合理使用队列:控制生产者-消费者速率
  3. 错误处理:使用try-except处理异步异常
  4. 资源清理:使用异步上下文管理器
  5. 监控性能:使用asyncio的调试模式

总结

异步编程模式为构建高性能Python应用提供了强大工具。掌握异步上下文管理器、队列、actor模型等模式,可以设计出可扩展、易维护的异步系统。