← 返回首页
⚙️

多进程编程

📂 python ⏱ 2 min 342 words

多进程编程

Python的 multiprocessing 模块允许你创建独立的进程,每个进程有自己的内存空间和Python解释器,适合CPU密集型任务。

创建进程

import multiprocessing
import os

def worker(name):
    print(f"进程 {name}: PID={os.getpid()}, 父PID={os.getppid()}")

if __name__ == "__main__":
    processes = []
    for i in range(3):
        p = multiprocessing.Process(target=worker, args=(f"P{i}",))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print("所有进程完成")

进程池 Pool

from multiprocessing import Pool
import time

def square(n):
    time.sleep(1)
    return n * n

if __name__ == "__main__":
    numbers = [1, 2, 3, 4, 5]

    with Pool(processes=3) as pool:
        # map - 保持顺序
        results = pool.map(square, numbers)
        print(f"map结果: {results}")

        # imap - 懒加载迭代器
        for result in pool.imap(square, numbers):
            print(result)

        # apply_async - 异步执行
        async_results = [pool.apply_async(square, (n,)) for n in numbers]
        results = [r.get(timeout=10) for r in async_results]
        print(f"异步结果: {results}")

共享内存

from multiprocessing import Process, Value, Array

def increment(shared_counter, shared_array):
    for _ in range(100):
        with shared_counter.get_lock():
            shared_counter.value += 1
    for i in range(len(shared_array)):
        shared_array[i] += 1

if __name__ == "__main__":
    counter = Value('i', 0)  # 共享整数
    array = Array('i', [0, 0, 0])  # 共享数组

    processes = [
        Process(target=increment, args=(counter, array))
        for _ in range(4)
    ]

    for p in processes:
        p.start()
    for p in processes:
        p.join()

    print(f"计数器: {counter.value}")  # 400
    print(f"数组: {list(array)}")      # [4, 4, 4]

进程间通信 Queue

from multiprocessing import Process, Queue
import time

def producer(queue):
    for i in range(5):
        queue.put(f"消息-{i}")
        print(f"生产: 消息-{i}")
        time.sleep(0.1)
    queue.put(None)

def consumer(queue):
    while True:
        item = queue.get()
        if item is None:
            break
        print(f"消费: {item}")

if __name__ == "__main__":
    queue = Queue()

    p1 = Process(target=producer, args=(queue,))
    p2 = Process(target=consumer, args=(queue,))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

Pipe 管道通信

from multiprocessing import Process, Pipe

def sender(conn):
    conn.send({"type": "greeting", "data": "你好"})
    conn.send({"type": "data", "data": [1, 2, 3]})
    conn.close()

def receiver(conn):
    while True:
        try:
            msg = conn.recv()
            print(f"收到: {msg}")
        except EOFError:
            break

if __name__ == "__main__":
    parent_conn, child_conn = Pipe()

    p1 = Process(target=sender, args=(parent_conn,))
    p2 = Process(target=receiver, args=(child_conn,))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

Manager 共享复杂对象

from multiprocessing import Process, Manager

def worker(shared_dict, shared_list):
    shared_dict["key"] = "value"
    shared_list.append(1)

if __name__ == "__main__":
    with Manager() as manager:
        shared_dict = manager.dict()
        shared_list = manager.list()

        processes = [
            Process(target=worker, args=(shared_dict, shared_list))
            for _ in range(3)
        ]

        for p in processes:
            p.start()
        for p in processes:
            p.join()

        print(dict(shared_dict))  # {'key': 'value'}
        print(list(shared_list))  # [1, 1, 1]

使用场景

多进程编程是处理CPU密集型任务的首选方案。