← 返回首页
并行

并发编程进阶

📂 python ⏱ 3 min 470 words

并发编程进阶

concurrent.futures模块提供了高级的并发执行接口。本文将深入讲解ThreadPoolExecutor、ProcessPoolExecutor和Future对象的使用技巧。

Executor基类

所有执行器都继承自Executor基类:

from concurrent.futures import Executor

class CustomExecutor(Executor):
    def submit(self, fn, *args, **kwargs):
        # 自定义提交逻辑
        return super().submit(fn, *args, **kwargs)

ThreadPoolExecutor详解

from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
import time

def fetch_url(url):
    response = requests.get(url, timeout=10)
    return {"url": url, "status": response.status_code, "size": len(response.content)}

urls = [
    "https://httpbin.org/get",
    "https://httpbin.org/ip",
    "https://httpbin.org/user-agent"
] * 10  # 30个URL

# 基本用法
start = time.time()
with ThreadPoolExecutor(max_workers=10) as executor:
    futures = [executor.submit(fetch_url, url) for url in urls]
    
    # 等待所有完成
    results = [f.result() for f in futures]
    
print(f"线程池完成: {time.time() - start:.2f}秒")

# 使用as_completed按完成顺序处理
with ThreadPoolExecutor(max_workers=10) as executor:
    future_to_url = {executor.submit(fetch_url, url): url for url in urls}
    
    for future in as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
            print(f"{url}: {data['status']}")
        except Exception as e:
            print(f"{url} 失败: {e}")

ProcessPoolExecutor并行计算

from concurrent.futures import ProcessPoolExecutor
import math
import os

def is_prime(n):
    if n < 2:
        return False
    for i in range(2, int(math.sqrt(n)) + 1):
        if n % i == 0:
            return False
    return True

def count_primes_in_range(range_tuple):
    start, end = range_tuple
    count = 0
    for n in range(start, end):
        if is_prime(n):
            count += 1
    return count

def parallel_prime_counting(limit, workers=None):
    if workers is None:
        workers = os.cpu_count()
    
    # 分割工作
    chunk_size = limit // workers
    ranges = [(i * chunk_size, min((i + 1) * chunk_size, limit)) 
              for i in range(workers)]
    
    with ProcessPoolExecutor(max_workers=workers) as executor:
        results = list(executor.map(count_primes_in_range, ranges))
    
    return sum(results)

# 计算1到100万之间的素数数量
total = parallel_prime_counting(1000000)
print(f"素数数量: {total}")

Future对象深入

from concurrent.futures import ThreadPoolExecutor, Future
import time
import random

def long_running_task(task_id):
    duration = random.uniform(1, 3)
    time.sleep(duration)
    if random.random() < 0.2:  # 20%失败率
        raise RuntimeError(f"任务{task_id}失败")
    return f"任务{task_id}完成,耗时{duration:.2f}秒"

# 创建执行器
executor = ThreadPoolExecutor(max_workers=5)

# 提交任务并获取Future
futures = []
for i in range(10):
    future = executor.submit(long_running_task, i)
    futures.append(future)

# 添加回调函数
def task_callback(future):
    try:
        result = future.result()
        print(f"成功: {result}")
    except Exception as e:
        print(f"失败: {e}")

for future in futures:
    future.add_done_callback(task_callback)

# 等待所有完成
for future in futures:
    future.result()  # 会重新抛出异常

executor.shutdown()

高级模式:带超时的执行

from concurrent.futures import ThreadPoolExecutor, TimeoutError
import time

def unreliable_task():
    time.sleep(random.uniform(0.5, 2))
    return "完成"

with ThreadPoolExecutor(max_workers=3) as executor:
    future = executor.submit(unreliable_task)
    
    try:
        # 等待最多1秒
        result = future.result(timeout=1)
        print(f"结果: {result}")
    except TimeoutError:
        print("任务超时")
        future.cancel()  # 尝试取消
    
    # 或者用as_completed带超时
    futures = [executor.submit(unreliable_task) for _ in range(5)]
    
    try:
        for future in as_completed(futures, timeout=2):
            print(future.result())
    except TimeoutError:
        print("部分任务仍在运行")

异常处理最佳实践

from concurrent.futures import ThreadPoolExecutor, as_completed
import traceback

def risky_operation():
    if random.random() < 0.3:
        raise ValueError("随机错误")
    return random.randint(1, 100)

def safe_wrapper(func, *args, **kwargs):
    try:
        return func(*args, **kwargs)
    except Exception as e:
        # 记录完整堆栈
        error_msg = traceback.format_exc()
        print(f"错误详情:\n{error_msg}")
        return None

with ThreadPoolExecutor(max_workers=4) as executor:
    # 使用map自动处理异常
    try:
        results = list(executor.map(
            lambda x: risky_operation() if x else None, 
            range(10)
        ))
        print(f"成功结果: {[r for r in results if r is not None]}")
    except Exception as e:
        print(f"批量执行失败: {e}")

性能调优

import os
import psutil

def get_optimal_workers():
    # CPU密集型:CPU核心数
    cpu_workers = os.cpu_count()
    
    # IO密集型:通常2*CPU+1到4*CPU+1
    io_workers = min(32, (cpu_workers or 1) * 4 + 1)
    
    # 根据内存调整
    mem = psutil.virtual_memory()
    if mem.percent > 80:
        io_workers = max(4, io_workers // 2)
    
    return io_workers

# 使用示例
optimal = get_optimal_workers()
with ThreadPoolExecutor(max_workers=optimal) as executor:
    # 执行IO密集型任务
    pass

常见陷阱

  1. GIL限制:CPU密集型任务用ProcessPoolExecutor
  2. 资源泄漏:使用with语句或显式shutdown
  3. 异常丢失:总是检查future.result()
  4. 死锁风险:避免在回调中提交新任务
  5. 序列化开销:减少进程间数据传输

总结

concurrent.futures提供了优雅的并发编程接口。根据任务类型选择线程池或进程池,合理管理Future对象,注意异常处理和性能调优,可以构建高效的并发程序。