并发编程进阶
并发编程进阶
concurrent.futures模块提供了高级的并发执行接口。本文将深入讲解ThreadPoolExecutor、ProcessPoolExecutor和Future对象的使用技巧。
Executor基类
所有执行器都继承自Executor基类:
from concurrent.futures import Executor
class CustomExecutor(Executor):
def submit(self, fn, *args, **kwargs):
# 自定义提交逻辑
return super().submit(fn, *args, **kwargs)
ThreadPoolExecutor详解
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
import time
def fetch_url(url):
response = requests.get(url, timeout=10)
return {"url": url, "status": response.status_code, "size": len(response.content)}
urls = [
"https://httpbin.org/get",
"https://httpbin.org/ip",
"https://httpbin.org/user-agent"
] * 10 # 30个URL
# 基本用法
start = time.time()
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(fetch_url, url) for url in urls]
# 等待所有完成
results = [f.result() for f in futures]
print(f"线程池完成: {time.time() - start:.2f}秒")
# 使用as_completed按完成顺序处理
with ThreadPoolExecutor(max_workers=10) as executor:
future_to_url = {executor.submit(fetch_url, url): url for url in urls}
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
print(f"{url}: {data['status']}")
except Exception as e:
print(f"{url} 失败: {e}")
ProcessPoolExecutor并行计算
from concurrent.futures import ProcessPoolExecutor
import math
import os
def is_prime(n):
if n < 2:
return False
for i in range(2, int(math.sqrt(n)) + 1):
if n % i == 0:
return False
return True
def count_primes_in_range(range_tuple):
start, end = range_tuple
count = 0
for n in range(start, end):
if is_prime(n):
count += 1
return count
def parallel_prime_counting(limit, workers=None):
if workers is None:
workers = os.cpu_count()
# 分割工作
chunk_size = limit // workers
ranges = [(i * chunk_size, min((i + 1) * chunk_size, limit))
for i in range(workers)]
with ProcessPoolExecutor(max_workers=workers) as executor:
results = list(executor.map(count_primes_in_range, ranges))
return sum(results)
# 计算1到100万之间的素数数量
total = parallel_prime_counting(1000000)
print(f"素数数量: {total}")
Future对象深入
from concurrent.futures import ThreadPoolExecutor, Future
import time
import random
def long_running_task(task_id):
duration = random.uniform(1, 3)
time.sleep(duration)
if random.random() < 0.2: # 20%失败率
raise RuntimeError(f"任务{task_id}失败")
return f"任务{task_id}完成,耗时{duration:.2f}秒"
# 创建执行器
executor = ThreadPoolExecutor(max_workers=5)
# 提交任务并获取Future
futures = []
for i in range(10):
future = executor.submit(long_running_task, i)
futures.append(future)
# 添加回调函数
def task_callback(future):
try:
result = future.result()
print(f"成功: {result}")
except Exception as e:
print(f"失败: {e}")
for future in futures:
future.add_done_callback(task_callback)
# 等待所有完成
for future in futures:
future.result() # 会重新抛出异常
executor.shutdown()
高级模式:带超时的执行
from concurrent.futures import ThreadPoolExecutor, TimeoutError
import time
def unreliable_task():
time.sleep(random.uniform(0.5, 2))
return "完成"
with ThreadPoolExecutor(max_workers=3) as executor:
future = executor.submit(unreliable_task)
try:
# 等待最多1秒
result = future.result(timeout=1)
print(f"结果: {result}")
except TimeoutError:
print("任务超时")
future.cancel() # 尝试取消
# 或者用as_completed带超时
futures = [executor.submit(unreliable_task) for _ in range(5)]
try:
for future in as_completed(futures, timeout=2):
print(future.result())
except TimeoutError:
print("部分任务仍在运行")
异常处理最佳实践
from concurrent.futures import ThreadPoolExecutor, as_completed
import traceback
def risky_operation():
if random.random() < 0.3:
raise ValueError("随机错误")
return random.randint(1, 100)
def safe_wrapper(func, *args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
# 记录完整堆栈
error_msg = traceback.format_exc()
print(f"错误详情:\n{error_msg}")
return None
with ThreadPoolExecutor(max_workers=4) as executor:
# 使用map自动处理异常
try:
results = list(executor.map(
lambda x: risky_operation() if x else None,
range(10)
))
print(f"成功结果: {[r for r in results if r is not None]}")
except Exception as e:
print(f"批量执行失败: {e}")
性能调优
import os
import psutil
def get_optimal_workers():
# CPU密集型:CPU核心数
cpu_workers = os.cpu_count()
# IO密集型:通常2*CPU+1到4*CPU+1
io_workers = min(32, (cpu_workers or 1) * 4 + 1)
# 根据内存调整
mem = psutil.virtual_memory()
if mem.percent > 80:
io_workers = max(4, io_workers // 2)
return io_workers
# 使用示例
optimal = get_optimal_workers()
with ThreadPoolExecutor(max_workers=optimal) as executor:
# 执行IO密集型任务
pass
常见陷阱
- GIL限制:CPU密集型任务用ProcessPoolExecutor
- 资源泄漏:使用with语句或显式shutdown
- 异常丢失:总是检查future.result()
- 死锁风险:避免在回调中提交新任务
- 序列化开销:减少进程间数据传输
总结
concurrent.futures提供了优雅的并发编程接口。根据任务类型选择线程池或进程池,合理管理Future对象,注意异常处理和性能调优,可以构建高效的并发程序。