← 返回首页
🐍

上下文管理器进阶

📂 python ⏱ 4 min 745 words

上下文管理器进阶

上下文管理器是Python中管理资源的重要机制,通过with语句确保资源的正确获取和释放。contextlib模块提供了创建和操作上下文管理器的高级工具,支持嵌套、异步和自定义实现。

基础上下文管理器

除了使用类实现__enter__和__exit__方法,contextlib提供了更简洁的方式创建上下文管理器。

from contextlib import contextmanager
import time

# 使用contextmanager装饰器
@contextmanager
def timer_context():
    """计时上下文管理器"""
    start = time.perf_counter()
    try:
        yield  # 暂停执行,将控制权交给with块
    finally:
        end = time.perf_counter()
        print(f"执行时间: {(end - start)*1000:.2f}ms")

# 使用上下文管理器
print("计时上下文管理器示例:")
with timer_context():
    # 模拟耗时操作
    total = sum(i * i for i in range(1000000))
    print(f"计算结果: {total}")

# 带参数的上下文管理器
@contextmanager
def managed_resource(resource_name, **kwargs):
    """带参数的资源管理上下文管理器"""
    print(f"获取资源: {resource_name}")
    resource = {"name": resource_name, "data": {}, **kwargs}
    
    try:
        yield resource  # 提供资源给with块
    except Exception as e:
        print(f"处理异常: {e}")
        resource["error"] = str(e)
    finally:
        print(f"释放资源: {resource_name}")

# 测试带参数的上下文管理器
print("\n带参数的资源管理:")
with managed_resource("数据库连接", timeout=30) as db:
    db["data"]["users"] = ["Alice", "Bob", "Charlie"]
    print(f"资源数据: {db['data']}")

嵌套上下文管理器

contextlib提供了多种方式来嵌套和组合上下文管理器,处理复杂的资源管理场景。

from contextlib import contextmanager, ExitStack
import time

# 嵌套上下文管理器
@contextmanager
def database_connection(name):
    """数据库连接上下文管理器"""
    print(f"建立数据库连接: {name}")
    connection = {"name": name, "connected": True}
    try:
        yield connection
    finally:
        connection["connected"] = False
        print(f"关闭数据库连接: {name}")

@contextmanager
def transaction(connection):
    """事务上下文管理器"""
    print(f"开始事务: {connection['name']}")
    try:
        yield connection
        print(f"提交事务: {connection['name']}")
    except Exception as e:
        print(f"回滚事务: {connection['name']}, 原因: {e}")
        raise

# 使用ExitStack管理多个上下文管理器
print("ExitStack嵌套示例:")
with ExitStack() as stack:
    # 动态创建多个上下文管理器
    connections = []
    for i in range(3):
        conn = stack.enter_context(database_connection(f"db_{i}"))
        connections.append(conn)
    
    # 所有连接都已建立
    print(f"已建立 {len(connections)} 个连接")
    
    # 在所有连接上执行操作
    for conn in connections:
        print(f"操作连接: {conn['name']}")
    
    # ExitStack退出时会自动关闭所有连接

# 上下文管理器组合
@contextmanager
def combined_context():
    """组合多个上下文管理器"""
    with database_connection("main_db") as db:
        with transaction(db) as txn:
            yield {"db": db, "transaction": txn}

print("\n组合上下文管理器:")
with combined_context() as ctx:
    print(f"数据库: {ctx['db']['name']}")
    print(f"事务状态: 进行中")

异步上下文管理器

Python 3.5+支持异步上下文管理器,使用async with语句管理异步资源,如网络连接、数据库连接等。

import asyncio
from contextlib import asynccontextmanager

# 异步上下文管理器
@asynccontextmanager
async def async_database_connection(connection_string):
    """异步数据库连接上下文管理器"""
    print(f"异步建立连接: {connection_string}")
    
    # 模拟异步连接建立
    await asyncio.sleep(0.1)
    connection = {"string": connection_string, "connected": True}
    
    try:
        yield connection
    finally:
        # 模拟异步连接关闭
        await asyncio.sleep(0.1)
        connection["connected"] = False
        print(f"异步关闭连接: {connection_string}")

# 异步资源管理
async def async_resource_example():
    print("异步资源管理示例:")
    
    async with async_database_connection("postgresql://localhost/mydb") as conn:
        print(f"连接状态: {conn['connected']}")
        
        # 模拟异步查询
        await asyncio.sleep(0.1)
        result = {"users": ["Alice", "Bob"]}
        print(f"查询结果: {result}")
    
    print(f"连接已关闭: {not conn['connected']}")

# 运行异步示例
# asyncio.run(async_resource_example())

# 异步ExitStack
from contextlib import AsyncExitStack

async def async_multiple_resources():
    """管理多个异步资源"""
    async with AsyncExitStack() as stack:
        connections = []
        for i in range(3):
            conn = await stack.enter_async_context(
                async_database_connection(f"async_db_{i}")
            )
            connections.append(conn)
        
        print(f"已建立 {len(connections)} 个异步连接")
        
        # 并发操作
        async def use_connection(conn):
            await asyncio.sleep(0.1)
            return f"使用 {conn['string']}"
        
        tasks = [use_connection(conn) for conn in connections]
        results = await asyncio.gather(*tasks)
        print(f"并发结果: {results}")

# asyncio.run(async_multiple_resources())

自定义上下文管理器

通过实现__enter__和__exit__方法,可以创建更复杂的上下文管理器,支持异常处理、状态管理和资源清理。

import time
from typing import Any, Optional

class Timer:
    """计时上下文管理器类"""
    
    def __init__(self, name: str = "代码块"):
        self.name = name
        self.start_time: Optional[float] = None
        self.end_time: Optional[float] = None
    
    def __enter__(self):
        self.start_time = time.perf_counter()
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.end_time = time.perf_counter()
        
        if exc_type is not None:
            print(f"计时器 {self.name} 检测到异常: {exc_val}")
            return False  # 不抑制异常
        
        return False  # 不抑制异常
    
    @property
    def elapsed(self) -> float:
        """获取经过的时间"""
        if self.start_time is None:
            return 0.0
        end = self.end_time or time.perf_counter()
        return end - self.start_time
    
    def __repr__(self) -> str:
        if self.end_time:
            return f"Timer({self.name}: {self.elapsed*1000:.2f}ms)"
        return f"Timer({self.name}: 运行中)"

# 使用自定义计时器
print("自定义计时器示例:")
with Timer("列表推导") as t1:
    result1 = [i ** 2 for i in range(1000000)]

with Timer("map函数") as t2:
    result2 = list(map(lambda x: x ** 2, range(1000000)))

print(f"列表推导: {t1.elapsed*1000:.2f}ms")
print(f"map函数: {t2.elapsed*1000:.2f}ms")
print(f"性能差异: {(t1.elapsed - t2.elapsed)*1000:.2f}ms")

# 带状态管理的上下文管理器
class ConnectionPool:
    """连接池上下文管理器"""
    
    def __init__(self, max_connections: int = 5):
        self.max_connections = max_connections
        self.connections = []
        self.available = []
        self.in_use = 0
    
    def __enter__(self):
        print(f"初始化连接池,最大连接数: {self.max_connections}")
        # 预创建连接
        for i in range(self.max_connections):
            conn = {"id": i, "active": True}
            self.connections.append(conn)
            self.available.append(conn)
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        print(f"关闭连接池,释放 {len(self.connections)} 个连接")
        self.connections.clear()
        self.available.clear()
        self.in_use = 0
        return False
    
    def acquire(self):
        """获取连接"""
        if not self.available:
            raise RuntimeError("没有可用连接")
        
        conn = self.available.pop()
        self.in_use += 1
        return conn
    
    def release(self, conn):
        """释放连接"""
        if conn not in self.connections:
            raise ValueError("无效连接")
        
        self.available.append(conn)
        self.in_use -= 1
    
    @property
    def stats(self):
        """获取连接池统计信息"""
        return {
            "total": len(self.connections),
            "available": len(self.available),
            "in_use": self.in_use
        }

# 测试连接池
print("\n连接池示例:")
with ConnectionPool(max_connections=3) as pool:
    print(f"初始状态: {pool.stats}")
    
    # 获取连接
    conn1 = pool.acquire()
    conn2 = pool.acquire()
    print(f"获取2个连接后: {pool.stats}")
    
    # 释放连接
    pool.release(conn1)
    print(f"释放1个连接后: {pool.stats}")
    
    # 获取更多连接
    conn3 = pool.acquire()
    conn4 = pool.acquire()
    print(f"获取更多连接后: {pool.stats}")
    
    # 释放所有连接
    pool.release(conn2)
    pool.release(conn3)
    pool.release(conn4)
    print(f"释放所有连接后: {pool.stats}")

实际应用:日志和监控

上下文管理器非常适合实现日志记录、性能监控和错误处理等横切关注点。

import time
import functools
from typing import Callable, Any

class PerformanceMonitor:
    """性能监控上下文管理器"""
    
    def __init__(self, operation_name: str, log_results: bool = True):
        self.operation_name = operation_name
        self.log_results = log_results
        self.start_time = None
        self.metrics = {}
    
    def __enter__(self):
        self.start_time = time.perf_counter()
        if self.log_results:
            print(f"开始执行: {self.operation_name}")
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        end_time = time.perf_counter()
        self.metrics['duration'] = end_time - self.start_time
        self.metrics['success'] = exc_type is None
        
        if exc_type is not None:
            self.metrics['error'] = str(exc_val)
        
        if self.log_results:
            status = "成功" if self.metrics['success'] else "失败"
            print(f"{self.operation_name} {status}: {self.metrics['duration']*1000:.2f}ms")
            
            if not self.metrics['success']:
                print(f"错误: {self.metrics['error']}")
        
        return False
    
    def record_metric(self, key: str, value: Any):
        """记录自定义指标"""
        self.metrics[key] = value

# 装饰器版本
def monitored(operation_name: str):
    """性能监控装饰器"""
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            with PerformanceMonitor(operation_name) as monitor:
                result = func(*args, **kwargs)
                monitor.record_metric('result_size', len(str(result)))
                return result
        return wrapper
    return decorator

# 使用性能监控
print("性能监控示例:")

with PerformanceMonitor("数据处理") as monitor:
    # 模拟数据处理
    data = [i ** 2 for i in range(100000)]
    monitor.record_metric('data_size', len(data))
    
    # 处理数据
    processed = [x * 2 for x in data]
    monitor.record_metric('processed_size', len(processed))

print(f"监控指标: {monitor.metrics}")

# 使用装饰器版本
@monitored("排序算法")
def sort_data(data):
    return sorted(data, reverse=True)

test_data = list(range(10000))
sorted_data = sort_data(test_data)

上下文管理器是Python中管理资源、处理异常和实现横切关注点的强大工具。通过掌握contextlib模块的高级用法,开发者可以创建更安全、更可读的代码,确保资源得到正确管理,同时简化复杂的控制流逻辑。