上下文管理器进阶
上下文管理器进阶
上下文管理器是Python中管理资源的重要机制,通过with语句确保资源的正确获取和释放。contextlib模块提供了创建和操作上下文管理器的高级工具,支持嵌套、异步和自定义实现。
基础上下文管理器
除了使用类实现__enter__和__exit__方法,contextlib提供了更简洁的方式创建上下文管理器。
from contextlib import contextmanager
import time
# 使用contextmanager装饰器
@contextmanager
def timer_context():
"""计时上下文管理器"""
start = time.perf_counter()
try:
yield # 暂停执行,将控制权交给with块
finally:
end = time.perf_counter()
print(f"执行时间: {(end - start)*1000:.2f}ms")
# 使用上下文管理器
print("计时上下文管理器示例:")
with timer_context():
# 模拟耗时操作
total = sum(i * i for i in range(1000000))
print(f"计算结果: {total}")
# 带参数的上下文管理器
@contextmanager
def managed_resource(resource_name, **kwargs):
"""带参数的资源管理上下文管理器"""
print(f"获取资源: {resource_name}")
resource = {"name": resource_name, "data": {}, **kwargs}
try:
yield resource # 提供资源给with块
except Exception as e:
print(f"处理异常: {e}")
resource["error"] = str(e)
finally:
print(f"释放资源: {resource_name}")
# 测试带参数的上下文管理器
print("\n带参数的资源管理:")
with managed_resource("数据库连接", timeout=30) as db:
db["data"]["users"] = ["Alice", "Bob", "Charlie"]
print(f"资源数据: {db['data']}")
嵌套上下文管理器
contextlib提供了多种方式来嵌套和组合上下文管理器,处理复杂的资源管理场景。
from contextlib import contextmanager, ExitStack
import time
# 嵌套上下文管理器
@contextmanager
def database_connection(name):
"""数据库连接上下文管理器"""
print(f"建立数据库连接: {name}")
connection = {"name": name, "connected": True}
try:
yield connection
finally:
connection["connected"] = False
print(f"关闭数据库连接: {name}")
@contextmanager
def transaction(connection):
"""事务上下文管理器"""
print(f"开始事务: {connection['name']}")
try:
yield connection
print(f"提交事务: {connection['name']}")
except Exception as e:
print(f"回滚事务: {connection['name']}, 原因: {e}")
raise
# 使用ExitStack管理多个上下文管理器
print("ExitStack嵌套示例:")
with ExitStack() as stack:
# 动态创建多个上下文管理器
connections = []
for i in range(3):
conn = stack.enter_context(database_connection(f"db_{i}"))
connections.append(conn)
# 所有连接都已建立
print(f"已建立 {len(connections)} 个连接")
# 在所有连接上执行操作
for conn in connections:
print(f"操作连接: {conn['name']}")
# ExitStack退出时会自动关闭所有连接
# 上下文管理器组合
@contextmanager
def combined_context():
"""组合多个上下文管理器"""
with database_connection("main_db") as db:
with transaction(db) as txn:
yield {"db": db, "transaction": txn}
print("\n组合上下文管理器:")
with combined_context() as ctx:
print(f"数据库: {ctx['db']['name']}")
print(f"事务状态: 进行中")
异步上下文管理器
Python 3.5+支持异步上下文管理器,使用async with语句管理异步资源,如网络连接、数据库连接等。
import asyncio
from contextlib import asynccontextmanager
# 异步上下文管理器
@asynccontextmanager
async def async_database_connection(connection_string):
"""异步数据库连接上下文管理器"""
print(f"异步建立连接: {connection_string}")
# 模拟异步连接建立
await asyncio.sleep(0.1)
connection = {"string": connection_string, "connected": True}
try:
yield connection
finally:
# 模拟异步连接关闭
await asyncio.sleep(0.1)
connection["connected"] = False
print(f"异步关闭连接: {connection_string}")
# 异步资源管理
async def async_resource_example():
print("异步资源管理示例:")
async with async_database_connection("postgresql://localhost/mydb") as conn:
print(f"连接状态: {conn['connected']}")
# 模拟异步查询
await asyncio.sleep(0.1)
result = {"users": ["Alice", "Bob"]}
print(f"查询结果: {result}")
print(f"连接已关闭: {not conn['connected']}")
# 运行异步示例
# asyncio.run(async_resource_example())
# 异步ExitStack
from contextlib import AsyncExitStack
async def async_multiple_resources():
"""管理多个异步资源"""
async with AsyncExitStack() as stack:
connections = []
for i in range(3):
conn = await stack.enter_async_context(
async_database_connection(f"async_db_{i}")
)
connections.append(conn)
print(f"已建立 {len(connections)} 个异步连接")
# 并发操作
async def use_connection(conn):
await asyncio.sleep(0.1)
return f"使用 {conn['string']}"
tasks = [use_connection(conn) for conn in connections]
results = await asyncio.gather(*tasks)
print(f"并发结果: {results}")
# asyncio.run(async_multiple_resources())
自定义上下文管理器
通过实现__enter__和__exit__方法,可以创建更复杂的上下文管理器,支持异常处理、状态管理和资源清理。
import time
from typing import Any, Optional
class Timer:
"""计时上下文管理器类"""
def __init__(self, name: str = "代码块"):
self.name = name
self.start_time: Optional[float] = None
self.end_time: Optional[float] = None
def __enter__(self):
self.start_time = time.perf_counter()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.end_time = time.perf_counter()
if exc_type is not None:
print(f"计时器 {self.name} 检测到异常: {exc_val}")
return False # 不抑制异常
return False # 不抑制异常
@property
def elapsed(self) -> float:
"""获取经过的时间"""
if self.start_time is None:
return 0.0
end = self.end_time or time.perf_counter()
return end - self.start_time
def __repr__(self) -> str:
if self.end_time:
return f"Timer({self.name}: {self.elapsed*1000:.2f}ms)"
return f"Timer({self.name}: 运行中)"
# 使用自定义计时器
print("自定义计时器示例:")
with Timer("列表推导") as t1:
result1 = [i ** 2 for i in range(1000000)]
with Timer("map函数") as t2:
result2 = list(map(lambda x: x ** 2, range(1000000)))
print(f"列表推导: {t1.elapsed*1000:.2f}ms")
print(f"map函数: {t2.elapsed*1000:.2f}ms")
print(f"性能差异: {(t1.elapsed - t2.elapsed)*1000:.2f}ms")
# 带状态管理的上下文管理器
class ConnectionPool:
"""连接池上下文管理器"""
def __init__(self, max_connections: int = 5):
self.max_connections = max_connections
self.connections = []
self.available = []
self.in_use = 0
def __enter__(self):
print(f"初始化连接池,最大连接数: {self.max_connections}")
# 预创建连接
for i in range(self.max_connections):
conn = {"id": i, "active": True}
self.connections.append(conn)
self.available.append(conn)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
print(f"关闭连接池,释放 {len(self.connections)} 个连接")
self.connections.clear()
self.available.clear()
self.in_use = 0
return False
def acquire(self):
"""获取连接"""
if not self.available:
raise RuntimeError("没有可用连接")
conn = self.available.pop()
self.in_use += 1
return conn
def release(self, conn):
"""释放连接"""
if conn not in self.connections:
raise ValueError("无效连接")
self.available.append(conn)
self.in_use -= 1
@property
def stats(self):
"""获取连接池统计信息"""
return {
"total": len(self.connections),
"available": len(self.available),
"in_use": self.in_use
}
# 测试连接池
print("\n连接池示例:")
with ConnectionPool(max_connections=3) as pool:
print(f"初始状态: {pool.stats}")
# 获取连接
conn1 = pool.acquire()
conn2 = pool.acquire()
print(f"获取2个连接后: {pool.stats}")
# 释放连接
pool.release(conn1)
print(f"释放1个连接后: {pool.stats}")
# 获取更多连接
conn3 = pool.acquire()
conn4 = pool.acquire()
print(f"获取更多连接后: {pool.stats}")
# 释放所有连接
pool.release(conn2)
pool.release(conn3)
pool.release(conn4)
print(f"释放所有连接后: {pool.stats}")
实际应用:日志和监控
上下文管理器非常适合实现日志记录、性能监控和错误处理等横切关注点。
import time
import functools
from typing import Callable, Any
class PerformanceMonitor:
"""性能监控上下文管理器"""
def __init__(self, operation_name: str, log_results: bool = True):
self.operation_name = operation_name
self.log_results = log_results
self.start_time = None
self.metrics = {}
def __enter__(self):
self.start_time = time.perf_counter()
if self.log_results:
print(f"开始执行: {self.operation_name}")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
end_time = time.perf_counter()
self.metrics['duration'] = end_time - self.start_time
self.metrics['success'] = exc_type is None
if exc_type is not None:
self.metrics['error'] = str(exc_val)
if self.log_results:
status = "成功" if self.metrics['success'] else "失败"
print(f"{self.operation_name} {status}: {self.metrics['duration']*1000:.2f}ms")
if not self.metrics['success']:
print(f"错误: {self.metrics['error']}")
return False
def record_metric(self, key: str, value: Any):
"""记录自定义指标"""
self.metrics[key] = value
# 装饰器版本
def monitored(operation_name: str):
"""性能监控装饰器"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
with PerformanceMonitor(operation_name) as monitor:
result = func(*args, **kwargs)
monitor.record_metric('result_size', len(str(result)))
return result
return wrapper
return decorator
# 使用性能监控
print("性能监控示例:")
with PerformanceMonitor("数据处理") as monitor:
# 模拟数据处理
data = [i ** 2 for i in range(100000)]
monitor.record_metric('data_size', len(data))
# 处理数据
processed = [x * 2 for x in data]
monitor.record_metric('processed_size', len(processed))
print(f"监控指标: {monitor.metrics}")
# 使用装饰器版本
@monitored("排序算法")
def sort_data(data):
return sorted(data, reverse=True)
test_data = list(range(10000))
sorted_data = sort_data(test_data)
上下文管理器是Python中管理资源、处理异常和实现横切关注点的强大工具。通过掌握contextlib模块的高级用法,开发者可以创建更安全、更可读的代码,确保资源得到正确管理,同时简化复杂的控制流逻辑。