多进程编程
多进程编程
Python的 multiprocessing 模块允许你创建独立的进程,每个进程有自己的内存空间和Python解释器,适合CPU密集型任务。
创建进程
import multiprocessing
import os
def worker(name):
print(f"进程 {name}: PID={os.getpid()}, 父PID={os.getppid()}")
if __name__ == "__main__":
processes = []
for i in range(3):
p = multiprocessing.Process(target=worker, args=(f"P{i}",))
processes.append(p)
p.start()
for p in processes:
p.join()
print("所有进程完成")
进程池 Pool
from multiprocessing import Pool
import time
def square(n):
time.sleep(1)
return n * n
if __name__ == "__main__":
numbers = [1, 2, 3, 4, 5]
with Pool(processes=3) as pool:
# map - 保持顺序
results = pool.map(square, numbers)
print(f"map结果: {results}")
# imap - 懒加载迭代器
for result in pool.imap(square, numbers):
print(result)
# apply_async - 异步执行
async_results = [pool.apply_async(square, (n,)) for n in numbers]
results = [r.get(timeout=10) for r in async_results]
print(f"异步结果: {results}")
共享内存
from multiprocessing import Process, Value, Array
def increment(shared_counter, shared_array):
for _ in range(100):
with shared_counter.get_lock():
shared_counter.value += 1
for i in range(len(shared_array)):
shared_array[i] += 1
if __name__ == "__main__":
counter = Value('i', 0) # 共享整数
array = Array('i', [0, 0, 0]) # 共享数组
processes = [
Process(target=increment, args=(counter, array))
for _ in range(4)
]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"计数器: {counter.value}") # 400
print(f"数组: {list(array)}") # [4, 4, 4]
进程间通信 Queue
from multiprocessing import Process, Queue
import time
def producer(queue):
for i in range(5):
queue.put(f"消息-{i}")
print(f"生产: 消息-{i}")
time.sleep(0.1)
queue.put(None)
def consumer(queue):
while True:
item = queue.get()
if item is None:
break
print(f"消费: {item}")
if __name__ == "__main__":
queue = Queue()
p1 = Process(target=producer, args=(queue,))
p2 = Process(target=consumer, args=(queue,))
p1.start()
p2.start()
p1.join()
p2.join()
Pipe 管道通信
from multiprocessing import Process, Pipe
def sender(conn):
conn.send({"type": "greeting", "data": "你好"})
conn.send({"type": "data", "data": [1, 2, 3]})
conn.close()
def receiver(conn):
while True:
try:
msg = conn.recv()
print(f"收到: {msg}")
except EOFError:
break
if __name__ == "__main__":
parent_conn, child_conn = Pipe()
p1 = Process(target=sender, args=(parent_conn,))
p2 = Process(target=receiver, args=(child_conn,))
p1.start()
p2.start()
p1.join()
p2.join()
Manager 共享复杂对象
from multiprocessing import Process, Manager
def worker(shared_dict, shared_list):
shared_dict["key"] = "value"
shared_list.append(1)
if __name__ == "__main__":
with Manager() as manager:
shared_dict = manager.dict()
shared_list = manager.list()
processes = [
Process(target=worker, args=(shared_dict, shared_list))
for _ in range(3)
]
for p in processes:
p.start()
for p in processes:
p.join()
print(dict(shared_dict)) # {'key': 'value'}
print(list(shared_list)) # [1, 1, 1]
使用场景
- CPU密集型计算:利用多核CPU并行计算
- 大数据处理:分块处理大型数据集
- 任务隔离:每个进程独立,一个崩溃不影响其他
- 突破GIL限制:真正的并行执行
多进程编程是处理CPU密集型任务的首选方案。